From 641e89ef0e15c9edde69f3b8cf82c7dd5f68687a Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 30 Sep 2015 14:28:07 +0000
Subject: [PATCH] OPENDJ-2016: New on disk merge import strategy based on storage engine.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java |  140 +++++++++++++++++++++++++---------------------
 1 files changed, 75 insertions(+), 65 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java
index 5961507..cdfdf05 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java
@@ -32,6 +32,8 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.LocalizableMessageBuilder;
@@ -50,6 +52,8 @@
 /** This class specializes the LDIFReader for imports. */
 final class ImportLDIFReader extends LDIFReader
 {
+  private final ConcurrentHashMap<DN, CountDownLatch> pendingMap = new ConcurrentHashMap<>();
+
   /**
    * A class holding the entry, its entryID as assigned by the LDIF reader and its suffix as
    * determined by the LDIF reader.
@@ -58,13 +62,13 @@
   {
     private final Entry entry;
     private final EntryID entryID;
-    private final Suffix suffix;
+    private final EntryContainer entryContainer;
 
-    private EntryInformation(Entry entry, EntryID entryID, Suffix suffix)
+    EntryInformation(Entry entry, EntryID entryID, EntryContainer entryContainer)
     {
       this.entry = entry;
       this.entryID = entryID;
-      this.suffix = suffix;
+      this.entryContainer = entryContainer;
     }
 
     Entry getEntry()
@@ -77,9 +81,9 @@
       return entryID;
     }
 
-    Suffix getSuffix()
+    EntryContainer getEntryContainer()
     {
-      return suffix;
+      return entryContainer;
     }
   }
 
@@ -108,15 +112,15 @@
    * Reads the next entry from the LDIF source.
    *
    * @return The next entry information read from the LDIF source, or <CODE>null</CODE> if the end of the LDIF
-   *         data is reached.
+   *         data is reached of if the import has been cancelled.
    * @param suffixesMap
-   *          A map of suffixes instances.
+   *          A map of entry containers instances.
    * @throws IOException
    *           If an I/O problem occurs while reading from the file.
    * @throws LDIFException
    *           If the information read cannot be parsed as an LDIF entry.
    */
-  public final EntryInformation readEntry(Map<DN, Suffix> suffixesMap) throws IOException, LDIFException
+  public final EntryInformation readEntry(Map<DN, EntryContainer> suffixesMap) throws IOException, LDIFException
   {
     final boolean checkSchema = importConfig.validateSchema();
     while (true)
@@ -124,7 +128,7 @@
       LinkedList<StringBuilder> lines;
       DN entryDN;
       EntryID entryID;
-      Suffix suffix;
+      final EntryContainer entryContainer;
       synchronized (this)
       {
         // Read the set of lines that make up the next entry.
@@ -155,16 +159,8 @@
           // read and return the next entry.
           continue;
         }
-        else if (!importConfig.includeEntry(entryDN))
-        {
-          logger.trace("Skipping entry %s because the DN is not one that "
-              + "should be included based on the include and exclude branches.", entryDN);
-          entriesRead.incrementAndGet();
-          logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
-          continue;
-        }
-        suffix = getMatchSuffix(entryDN, suffixesMap);
-        if (suffix == null)
+        entryContainer = getEntryContainer(entryDN, suffixesMap);
+        if (entryContainer == null)
         {
           logger.trace("Skipping entry %s because the DN is not one that "
               + "should be included based on a suffix match check.", entryDN);
@@ -174,23 +170,29 @@
         }
         entriesRead.incrementAndGet();
         entryID = rootContainer.getNextEntryID();
-        suffix.addPending(entryDN);
+
+        if (!addPending(entryDN))
+        {
+          logger.trace("Skipping entry %s because the DN already exists.", entryDN);
+          logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
+          continue;
+        }
       }
 
       // Create the entry and see if it is one that should be included in the import
-      final Entry entry = createEntry(lines, entryDN, checkSchema, suffix);
+      final Entry entry = createEntry(lines, entryDN, checkSchema);
       if (entry == null
-          || !isIncludedInImport(entry, suffix, lines)
-          || !invokeImportPlugins(entry, suffix, lines)
-          || (checkSchema && !isValidAgainstSchema(entry, suffix, lines)))
+          || !invokeImportPlugins(entry, lines)
+          || (checkSchema && !isValidAgainstSchema(entry, lines)))
       {
+        removePending(entryDN);
         continue;
       }
-      return new EntryInformation(entry, entryID, suffix);
+      return new EntryInformation(entry, entryID, entryContainer);
     }
   }
 
-  private Entry createEntry(List<StringBuilder> lines, DN entryDN, boolean checkSchema, Suffix suffix)
+  private Entry createEntry(List<StringBuilder> lines, DN entryDN, boolean checkSchema)
   {
     // Read the set of attributes from the entry.
     Map<ObjectClass, String> objectClasses = new HashMap<>();
@@ -210,7 +212,6 @@
         logger.trace("Skipping entry %s because reading" + "its attributes failed.", entryDN);
       }
       logToSkipWriter(lines, ERR_LDIF_READ_ATTR_SKIP.get(entryDN, e.getMessage()));
-      suffix.removePending(entryDN);
       return null;
     }
 
@@ -220,36 +221,7 @@
     return entry;
   }
 
-  private boolean isIncludedInImport(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
-  {
-    final DN entryDN = entry.getName();
-    try
-    {
-      if (!importConfig.includeEntry(entry))
-      {
-        if (logger.isTraceEnabled())
-        {
-          logger.trace("Skipping entry %s because the DN is not one that "
-              + "should be included based on the include and exclude filters.", entryDN);
-        }
-        logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
-        suffix.removePending(entryDN);
-        return false;
-      }
-    }
-    catch (Exception e)
-    {
-      logger.traceException(e);
-      suffix.removePending(entryDN);
-      logToSkipWriter(lines,
-          ERR_LDIF_COULD_NOT_EVALUATE_FILTERS_FOR_IMPORT.get(entry.getName(), lastEntryLineNumber, e));
-      suffix.removePending(entryDN);
-      return false;
-    }
-    return true;
-  }
-
-  private boolean invokeImportPlugins(final Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
+  private boolean invokeImportPlugins(final Entry entry, LinkedList<StringBuilder> lines)
   {
     if (importConfig.invokeImportPlugins())
     {
@@ -269,14 +241,13 @@
         }
 
         logToRejectWriter(lines, m);
-        suffix.removePending(entryDN);
         return false;
       }
     }
     return true;
   }
 
-  private boolean isValidAgainstSchema(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
+  private boolean isValidAgainstSchema(Entry entry, LinkedList<StringBuilder> lines)
   {
     final DN entryDN = entry.getName();
     addRDNAttributesIfNecessary(entryDN, entry.getUserAttributes(), entry.getOperationalAttributes());
@@ -288,7 +259,6 @@
     {
       LocalizableMessage message = ERR_LDIF_SCHEMA_VIOLATION.get(entryDN, lastEntryLineNumber, invalidReason);
       logToRejectWriter(lines, message);
-      suffix.removePending(entryDN);
       return false;
     }
     return true;
@@ -301,20 +271,60 @@
    *          The DN to search for.
    * @param map
    *          The map to search.
-   * @return The suffix instance that matches the DN, or null if no match is found.
+   * @return The entry container instance that matches the DN, or null if no match is found.
    */
-  private Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map)
+  private EntryContainer getEntryContainer(DN dn, Map<DN, EntryContainer> map)
   {
     DN nodeDN = dn;
     while (nodeDN != null)
     {
-      final Suffix suffix = map.get(nodeDN);
-      if (suffix != null)
+      final EntryContainer entryContainer = map.get(nodeDN);
+      if (entryContainer != null)
       {
-        return suffix;
+        return entryContainer;
       }
       nodeDN = nodeDN.getParentDNInSuffix();
     }
     return null;
   }
+
+  /**
+   * Make sure the specified parent DN is not in the pending map.
+   *
+   * @param parentDN The DN of the parent.
+   */
+  void waitIfPending(DN parentDN)  throws InterruptedException
+  {
+    final CountDownLatch l = pendingMap.get(parentDN);
+    if (l != null)
+    {
+      l.await();
+    }
+  }
+
+  /**
+   * Add specified DN to the pending map.
+   *
+   * @param dn The DN to add to the map.
+   * @return true if the DN was added, false if the DN is already present.
+   */
+  private boolean addPending(DN dn)
+  {
+    return pendingMap.putIfAbsent(dn, new CountDownLatch(1)) == null;
+  }
+
+  /**
+   * Remove the specified DN from the pending map, it may not exist if the
+   * entries are being migrated so just return.
+   *
+   * @param dn The DN to remove from the map.
+   */
+  void removePending(DN dn)
+  {
+    CountDownLatch l = pendingMap.remove(dn);
+    if(l != null)
+    {
+      l.countDown();
+    }
+  }
 }

--
Gitblit v1.10.0