From 641e89ef0e15c9edde69f3b8cf82c7dd5f68687a Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 30 Sep 2015 14:28:07 +0000
Subject: [PATCH] OPENDJ-2016: New on disk merge import strategy based on storage engine.
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java | 140 +++++++++++++++++++++++++---------------------
1 files changed, 75 insertions(+), 65 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java
index 5961507..cdfdf05 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java
@@ -32,6 +32,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.LocalizableMessageBuilder;
@@ -50,6 +52,8 @@
/** This class specializes the LDIFReader for imports. */
final class ImportLDIFReader extends LDIFReader
{
+ private final ConcurrentHashMap<DN, CountDownLatch> pendingMap = new ConcurrentHashMap<>();
+
/**
* A class holding the entry, its entryID as assigned by the LDIF reader and its suffix as
* determined by the LDIF reader.
@@ -58,13 +62,13 @@
{
private final Entry entry;
private final EntryID entryID;
- private final Suffix suffix;
+ private final EntryContainer entryContainer;
- private EntryInformation(Entry entry, EntryID entryID, Suffix suffix)
+ EntryInformation(Entry entry, EntryID entryID, EntryContainer entryContainer)
{
this.entry = entry;
this.entryID = entryID;
- this.suffix = suffix;
+ this.entryContainer = entryContainer;
}
Entry getEntry()
@@ -77,9 +81,9 @@
return entryID;
}
- Suffix getSuffix()
+ EntryContainer getEntryContainer()
{
- return suffix;
+ return entryContainer;
}
}
@@ -108,15 +112,15 @@
* Reads the next entry from the LDIF source.
*
* @return The next entry information read from the LDIF source, or <CODE>null</CODE> if the end of the LDIF
- * data is reached.
+ * data is reached of if the import has been cancelled.
* @param suffixesMap
- * A map of suffixes instances.
+ * A map of entry containers instances.
* @throws IOException
* If an I/O problem occurs while reading from the file.
* @throws LDIFException
* If the information read cannot be parsed as an LDIF entry.
*/
- public final EntryInformation readEntry(Map<DN, Suffix> suffixesMap) throws IOException, LDIFException
+ public final EntryInformation readEntry(Map<DN, EntryContainer> suffixesMap) throws IOException, LDIFException
{
final boolean checkSchema = importConfig.validateSchema();
while (true)
@@ -124,7 +128,7 @@
LinkedList<StringBuilder> lines;
DN entryDN;
EntryID entryID;
- Suffix suffix;
+ final EntryContainer entryContainer;
synchronized (this)
{
// Read the set of lines that make up the next entry.
@@ -155,16 +159,8 @@
// read and return the next entry.
continue;
}
- else if (!importConfig.includeEntry(entryDN))
- {
- logger.trace("Skipping entry %s because the DN is not one that "
- + "should be included based on the include and exclude branches.", entryDN);
- entriesRead.incrementAndGet();
- logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
- continue;
- }
- suffix = getMatchSuffix(entryDN, suffixesMap);
- if (suffix == null)
+ entryContainer = getEntryContainer(entryDN, suffixesMap);
+ if (entryContainer == null)
{
logger.trace("Skipping entry %s because the DN is not one that "
+ "should be included based on a suffix match check.", entryDN);
@@ -174,23 +170,29 @@
}
entriesRead.incrementAndGet();
entryID = rootContainer.getNextEntryID();
- suffix.addPending(entryDN);
+
+ if (!addPending(entryDN))
+ {
+ logger.trace("Skipping entry %s because the DN already exists.", entryDN);
+ logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
+ continue;
+ }
}
// Create the entry and see if it is one that should be included in the import
- final Entry entry = createEntry(lines, entryDN, checkSchema, suffix);
+ final Entry entry = createEntry(lines, entryDN, checkSchema);
if (entry == null
- || !isIncludedInImport(entry, suffix, lines)
- || !invokeImportPlugins(entry, suffix, lines)
- || (checkSchema && !isValidAgainstSchema(entry, suffix, lines)))
+ || !invokeImportPlugins(entry, lines)
+ || (checkSchema && !isValidAgainstSchema(entry, lines)))
{
+ removePending(entryDN);
continue;
}
- return new EntryInformation(entry, entryID, suffix);
+ return new EntryInformation(entry, entryID, entryContainer);
}
}
- private Entry createEntry(List<StringBuilder> lines, DN entryDN, boolean checkSchema, Suffix suffix)
+ private Entry createEntry(List<StringBuilder> lines, DN entryDN, boolean checkSchema)
{
// Read the set of attributes from the entry.
Map<ObjectClass, String> objectClasses = new HashMap<>();
@@ -210,7 +212,6 @@
logger.trace("Skipping entry %s because reading" + "its attributes failed.", entryDN);
}
logToSkipWriter(lines, ERR_LDIF_READ_ATTR_SKIP.get(entryDN, e.getMessage()));
- suffix.removePending(entryDN);
return null;
}
@@ -220,36 +221,7 @@
return entry;
}
- private boolean isIncludedInImport(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
- {
- final DN entryDN = entry.getName();
- try
- {
- if (!importConfig.includeEntry(entry))
- {
- if (logger.isTraceEnabled())
- {
- logger.trace("Skipping entry %s because the DN is not one that "
- + "should be included based on the include and exclude filters.", entryDN);
- }
- logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
- suffix.removePending(entryDN);
- return false;
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
- suffix.removePending(entryDN);
- logToSkipWriter(lines,
- ERR_LDIF_COULD_NOT_EVALUATE_FILTERS_FOR_IMPORT.get(entry.getName(), lastEntryLineNumber, e));
- suffix.removePending(entryDN);
- return false;
- }
- return true;
- }
-
- private boolean invokeImportPlugins(final Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
+ private boolean invokeImportPlugins(final Entry entry, LinkedList<StringBuilder> lines)
{
if (importConfig.invokeImportPlugins())
{
@@ -269,14 +241,13 @@
}
logToRejectWriter(lines, m);
- suffix.removePending(entryDN);
return false;
}
}
return true;
}
- private boolean isValidAgainstSchema(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
+ private boolean isValidAgainstSchema(Entry entry, LinkedList<StringBuilder> lines)
{
final DN entryDN = entry.getName();
addRDNAttributesIfNecessary(entryDN, entry.getUserAttributes(), entry.getOperationalAttributes());
@@ -288,7 +259,6 @@
{
LocalizableMessage message = ERR_LDIF_SCHEMA_VIOLATION.get(entryDN, lastEntryLineNumber, invalidReason);
logToRejectWriter(lines, message);
- suffix.removePending(entryDN);
return false;
}
return true;
@@ -301,20 +271,60 @@
* The DN to search for.
* @param map
* The map to search.
- * @return The suffix instance that matches the DN, or null if no match is found.
+ * @return The entry container instance that matches the DN, or null if no match is found.
*/
- private Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map)
+ private EntryContainer getEntryContainer(DN dn, Map<DN, EntryContainer> map)
{
DN nodeDN = dn;
while (nodeDN != null)
{
- final Suffix suffix = map.get(nodeDN);
- if (suffix != null)
+ final EntryContainer entryContainer = map.get(nodeDN);
+ if (entryContainer != null)
{
- return suffix;
+ return entryContainer;
}
nodeDN = nodeDN.getParentDNInSuffix();
}
return null;
}
+
+ /**
+ * Make sure the specified parent DN is not in the pending map.
+ *
+ * @param parentDN The DN of the parent.
+ */
+ void waitIfPending(DN parentDN) throws InterruptedException
+ {
+ final CountDownLatch l = pendingMap.get(parentDN);
+ if (l != null)
+ {
+ l.await();
+ }
+ }
+
+ /**
+ * Add specified DN to the pending map.
+ *
+ * @param dn The DN to add to the map.
+ * @return true if the DN was added, false if the DN is already present.
+ */
+ private boolean addPending(DN dn)
+ {
+ return pendingMap.putIfAbsent(dn, new CountDownLatch(1)) == null;
+ }
+
+ /**
+ * Remove the specified DN from the pending map, it may not exist if the
+ * entries are being migrated so just return.
+ *
+ * @param dn The DN to remove from the map.
+ */
+ void removePending(DN dn)
+ {
+ CountDownLatch l = pendingMap.remove(dn);
+ if(l != null)
+ {
+ l.countDown();
+ }
+ }
}
--
Gitblit v1.10.0