From 473991b04ba86d4ab011bde576280a27f2659d09 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 03 Feb 2015 14:50:58 +0000
Subject: [PATCH] Remove org.opends.server.backends.jeb.importLDIF package
---
opendj3-server-dev/src/server/org/opends/server/backends/jeb/IndexInputBuffer.java | 403 +++
opendj3-server-dev/src/server/org/opends/server/backends/jeb/Index.java | 1
opendj3-server-dev/src/server/org/opends/server/backends/jeb/ImportIDSet.java | 477 ++++
opendj3-server-dev/src/server/org/opends/server/backends/jeb/ImportLDIFReader.java | 273 ++
opendj3-server-dev/src/server/org/opends/server/backends/jeb/Suffix.java | 369 +++
opendj3-server-dev/src/server/org/opends/server/backends/jeb/NullIndex.java | 1
opendj3-server-dev/src/server/org/opends/server/backends/jeb/BackendImpl.java | 3
opendj3-server-dev/src/server/org/opends/server/backends/jeb/Importer.java | 4416 ++++++++++++++++++++++++++++++++++++++
opendj3-server-dev/src/server/org/opends/server/backends/jeb/IndexOutputBuffer.java | 999 ++++++++
9 files changed, 6,938 insertions(+), 4 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/BackendImpl.java b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/BackendImpl.java
index 8184f46..1048b25 100644
--- a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/BackendImpl.java
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/BackendImpl.java
@@ -58,7 +58,6 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.backends.RebuildConfig;
import org.opends.server.backends.VerifyConfig;
-import org.opends.server.backends.jeb.importLDIF.Importer;
import org.opends.server.core.*;
import org.opends.server.extensions.DiskSpaceMonitor;
import org.opends.server.types.*;
@@ -1157,7 +1156,7 @@
* @param e The DatabaseException to be converted.
* @return DirectoryException created from exception.
*/
- DirectoryException createDirectoryException(DatabaseException e) {
+ private DirectoryException createDirectoryException(DatabaseException e) {
if (e instanceof EnvironmentFailureException && !rootContainer.isValid()) {
LocalizableMessage message = NOTE_BACKEND_ENVIRONMENT_UNUSABLE.get(getBackendID());
logger.info(message);
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/ImportIDSet.java b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/ImportIDSet.java
new file mode 100644
index 0000000..45705d4
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/ImportIDSet.java
@@ -0,0 +1,477 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2009 Sun Microsystems, Inc.
+ * Portions Copyright 2011-2015 ForgeRock AS
+ */
+package org.opends.server.backends.jeb;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class manages the set of ID that are to be eventually added to an index
+ * database. It is responsible for determining if the number of IDs is above
+ * the configured ID limit. If the limit it reached, the class stops tracking
+ * individual IDs and marks the set as undefined. This class is not thread safe.
+ */
+class ImportIDSet {
+
+ /** The internal array where elements are stored. */
+ private long[] array;
+ /** The number of valid elements in the array. */
+ private int count;
+ /** Boolean to keep track if the instance is defined or not. */
+ private boolean isDefined = true;
+ /** Size of the undefined if count is kept. */
+ private long undefinedSize;
+ /** Key related to an ID set. */
+ private ByteBuffer key;
+ /** The entry limit size. */
+ private final int limit;
+ /** Set to true if a count of ids above the entry limit should be kept. */
+ private final boolean doCount;
+
+ /**
+ * Create an import ID set of the specified size, index limit and index
+ * maintain count, plus an extra 128 slots.
+ *
+ * @param size The size of the the underlying array, plus some extra space.
+ * @param limit The index entry limit.
+ * @param doCount The index maintain count.
+ */
+ public ImportIDSet(int size, int limit, boolean doCount)
+ {
+ this.array = new long[size + 128];
+ // A limit of 0 means unlimited.
+ this.limit = limit == 0 ? Integer.MAX_VALUE : limit;
+ this.doCount = doCount;
+ }
+
+ /** Create an empty import instance. */
+ public ImportIDSet()
+ {
+ this.limit = -1;
+ this.doCount = false;
+ }
+
+ /**
+ * Clear the set so it can be reused again. The boolean indexParam specifies
+ * if the index parameters should be cleared also.
+ */
+ public void clear()
+ {
+ undefinedSize = 0;
+ isDefined = true;
+ count = 0;
+ }
+
+ /**
+ * Return if an import ID set is defined or not.
+ *
+ * @return <CODE>True</CODE> if an import ID set is defined.
+ */
+ public boolean isDefined()
+ {
+ return isDefined;
+ }
+
+ /** Set an import ID set to undefined. */
+ private void setUndefined() {
+ array = null;
+ isDefined = false;
+ }
+
+ /**
+ * Add the specified entry id to an import ID set.
+ *
+ * @param entryID The entry ID to add to an import ID set.
+ */
+ void addEntryID(EntryID entryID) {
+ addEntryID(entryID.longValue());
+ }
+
+ /**
+ * Add the specified long value to an import ID set.
+ *
+ * @param l The long value to add to an import ID set.
+ */
+ void addEntryID(long l) {
+ if(!isDefined()) {
+ if(doCount) {
+ undefinedSize++;
+ }
+ return;
+ }
+ if (l < 0 || (isDefined() && count + 1 > limit))
+ {
+ setUndefined();
+ if(doCount) {
+ undefinedSize = count + 1;
+ } else {
+ undefinedSize = Long.MAX_VALUE;
+ }
+ count = 0;
+ } else {
+ add(l);
+ }
+ }
+
+ private boolean mergeCount(byte[] dBbytes, ImportIDSet importIdSet) {
+ boolean incrementLimitCount=false;
+ boolean dbUndefined = isDBUndefined(dBbytes);
+
+ if (dbUndefined && !importIdSet.isDefined()) {
+ undefinedSize = JebFormat.entryIDUndefinedSizeFromDatabase(dBbytes) +
+ importIdSet.undefinedSize;
+ isDefined=false;
+ } else if (dbUndefined && importIdSet.isDefined()) {
+ undefinedSize = JebFormat.entryIDUndefinedSizeFromDatabase(dBbytes) +
+ importIdSet.size();
+ isDefined=false;
+ } else if(!importIdSet.isDefined()) {
+ int dbSize = JebFormat.entryIDListFromDatabase(dBbytes).length;
+ undefinedSize = dbSize + importIdSet.undefinedSize;
+ isDefined = false;
+ incrementLimitCount = true;
+ } else {
+ array = JebFormat.entryIDListFromDatabase(dBbytes);
+ if(array.length + importIdSet.size() > limit) {
+ undefinedSize = array.length + importIdSet.size();
+ isDefined=false;
+ incrementLimitCount=true;
+ } else {
+ count = array.length;
+ addAll(importIdSet);
+ }
+ }
+ return incrementLimitCount;
+ }
+
+ /**
+ * Remove the specified import ID set from the byte array read from the DB.
+ *
+ * @param bytes The byte array read from JEB.
+ * @param importIdSet The import ID set to delete.
+ */
+ public void remove(byte[] bytes, ImportIDSet importIdSet)
+ {
+ if (isDBUndefined(bytes)) {
+ isDefined=false;
+ importIdSet.setUndefined();
+ undefinedSize = Long.MAX_VALUE;
+ } else if(!importIdSet.isDefined()) {
+ isDefined=false;
+ undefinedSize = Long.MAX_VALUE;
+ } else {
+ array = JebFormat.entryIDListFromDatabase(bytes);
+ if(array.length - importIdSet.size() > limit) {
+ isDefined=false;
+ count = 0;
+ importIdSet.setUndefined();
+ undefinedSize = Long.MAX_VALUE;
+ } else {
+ count = array.length;
+ removeAll(importIdSet);
+ }
+ }
+ }
+
+ /**
+ * Merge the specified byte array read from a DB, with the specified import
+ * ID set. The specified limit and maintain count parameters define
+ * if the newly merged set is defined or not.
+ *
+ * @param bytes The byte array of IDs read from a DB.
+ * @param importIdSet The import ID set to merge the byte array with.
+ * @return <CODE>True</CODE> if the import ID set started keeping a count as
+ * a result of the merge.
+ */
+ public boolean merge(byte[] bytes, ImportIDSet importIdSet)
+ {
+ boolean incrementLimitCount=false;
+ if(doCount) {
+ incrementLimitCount = mergeCount(bytes, importIdSet);
+ } else if (isDBUndefined(bytes)) {
+ isDefined = false;
+ importIdSet.setUndefined();
+ undefinedSize = Long.MAX_VALUE;
+ count = 0;
+ } else if(!importIdSet.isDefined()) {
+ isDefined = false;
+ incrementLimitCount = true;
+ undefinedSize = Long.MAX_VALUE;
+ count = 0;
+ } else {
+ array = JebFormat.entryIDListFromDatabase(bytes);
+ if (array.length + importIdSet.size() > limit) {
+ isDefined = false;
+ incrementLimitCount = true;
+ count = 0;
+ importIdSet.setUndefined();
+ undefinedSize = Long.MAX_VALUE;
+ } else {
+ count = array.length;
+ addAll(importIdSet);
+ }
+ }
+ return incrementLimitCount;
+ }
+
+ private boolean isDBUndefined(byte[] bytes)
+ {
+ return (bytes[0] & 0x80) == 0x80;
+ }
+
+ private void removeAll(ImportIDSet that) {
+ long[] newArray = new long[array.length];
+ int c = 0;
+ for(int i=0; i < count; i++)
+ {
+ if(binarySearch(that.array, that.count, array[i]) < 0)
+ {
+ newArray[c++] = array[i];
+ }
+ }
+ array = newArray;
+ count = c;
+ }
+
+ private void addAll(ImportIDSet that) {
+ resize(this.count+that.count);
+
+ if (that.count == 0)
+ {
+ return;
+ }
+
+ // Optimize for the case where the two sets are sure to have no overlap.
+ if (this.count == 0 || that.array[0] > this.array[this.count-1])
+ {
+ System.arraycopy(that.array, 0, this.array, this.count, that.count);
+ count += that.count;
+ return;
+ }
+
+ if (this.array[0] > that.array[that.count-1])
+ {
+ System.arraycopy(this.array, 0, this.array, that.count, this.count);
+ System.arraycopy(that.array, 0, this.array, 0, that.count);
+ count += that.count;
+ return;
+ }
+
+ int destPos = binarySearch(this.array, this.count, that.array[0]);
+ if (destPos < 0)
+ {
+ destPos = -(destPos+1);
+ }
+
+ // Make space for the copy.
+ int aCount = this.count - destPos;
+ int aPos = destPos + that.count;
+ int aEnd = aPos + aCount;
+ System.arraycopy(this.array, destPos, this.array, aPos, aCount);
+
+ // Optimize for the case where there is no overlap.
+ if (this.array[aPos] > that.array[that.count-1])
+ {
+ System.arraycopy(that.array, 0, this.array, destPos, that.count);
+ count += that.count;
+ return;
+ }
+
+ int bPos;
+ for ( bPos = 0; aPos < aEnd && bPos < that.count; )
+ {
+ if ( this.array[aPos] < that.array[bPos] )
+ {
+ this.array[destPos++] = this.array[aPos++];
+ }
+ else if ( this.array[aPos] > that.array[bPos] )
+ {
+ this.array[destPos++] = that.array[bPos++];
+ }
+ else
+ {
+ this.array[destPos++] = this.array[aPos++];
+ bPos++;
+ }
+ }
+
+ // Copy any remainder.
+ int aRemain = aEnd - aPos;
+ if (aRemain > 0)
+ {
+ System.arraycopy(this.array, aPos, this.array, destPos, aRemain);
+ destPos += aRemain;
+ }
+
+ int bRemain = that.count - bPos;
+ if (bRemain > 0)
+ {
+ System.arraycopy(that.array, bPos, this.array, destPos, bRemain);
+ destPos += bRemain;
+ }
+
+ count = destPos;
+ }
+
+ /**
+ * Return the number of IDs in an import ID set.
+ *
+ * @return The current size of an import ID set.
+ */
+ public int size()
+ {
+ return count;
+ }
+
+ private boolean add(long v)
+ {
+ resize(count+1);
+
+ if (count == 0 || v > array[count-1])
+ {
+ array[count++] = v;
+ return true;
+ }
+
+ int pos = binarySearch(array, count, v);
+ if (pos >=0)
+ {
+ return false;
+ }
+
+ // For a negative return value r, the index -(r+1) gives the array
+ // index at which the specified value can be inserted to maintain
+ // the sorted order of the array.
+ pos = -(pos+1);
+
+ System.arraycopy(array, pos, array, pos+1, count-pos);
+ array[pos] = v;
+ count++;
+ return true;
+ }
+
+ private static int binarySearch(long[] a, int count, long key)
+ {
+ int low = 0;
+ int high = count-1;
+
+ while (low <= high)
+ {
+ int mid = low + high >> 1;
+ long midVal = a[mid];
+
+ if (midVal < key)
+ {
+ low = mid + 1;
+ }
+ else if (midVal > key)
+ {
+ high = mid - 1;
+ }
+ else
+ {
+ return mid; // key found
+ }
+ }
+ return -(low + 1); // key not found.
+ }
+
+ private void resize(int size)
+ {
+ if (array == null)
+ {
+ array = new long[size];
+ }
+ else if (array.length < size)
+ {
+ // Expand the size of the array in powers of two.
+ int newSize = array.length == 0 ? 1 : array.length;
+ do
+ {
+ newSize *= 2;
+ } while (newSize < size);
+
+ long[] newBytes = new long[newSize];
+ System.arraycopy(array, 0, newBytes, 0, count);
+ array = newBytes;
+ }
+ }
+
+ /**
+ * Create a byte array suitable to write to a JEB DB from an import ID set.
+ *
+ * @return A byte array suitable for writing to a JEB DB.
+ */
+ public byte[] toDatabase()
+ {
+ if(isDefined) {
+ return encode();
+ } else {
+ return JebFormat.entryIDUndefinedSizeToDatabase(undefinedSize);
+ }
+ }
+
+ private byte[] encode()
+ {
+ final int encodedSize = count * 8;
+ final byte[] bytes = new byte[encodedSize];
+ int pos = 0;
+ for (int i = 0; i < count; i++) {
+ final long id = array[i] & 0x00ffffffffL; // JNR: why is this necessary?
+
+ // encode the entryID
+ bytes[pos++] = (byte) ((id >>> 56) & 0xFF);
+ bytes[pos++] = (byte) ((id >>> 48) & 0xFF);
+ bytes[pos++] = (byte) ((id >>> 40) & 0xFF);
+ bytes[pos++] = (byte) ((id >>> 32) & 0xFF);
+ bytes[pos++] = (byte) ((id >>> 24) & 0xFF);
+ bytes[pos++] = (byte) ((id >>> 16) & 0xFF);
+ bytes[pos++] = (byte) ((id >>> 8) & 0xFF);
+ bytes[pos++] = (byte) (id & 0xFF);
+ }
+ return bytes;
+ }
+
+ /**
+ * Set the DB key related to an import ID set.
+ *
+ * @param key Byte array containing the key.
+ */
+ public void setKey(ByteBuffer key)
+ {
+ this.key = key;
+ }
+
+ /**
+ * Return the DB key related to an import ID set.
+ *
+ * @return The byte array containing the key.
+ */
+ public ByteBuffer getKey()
+ {
+ return key;
+ }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/ImportLDIFReader.java b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/ImportLDIFReader.java
new file mode 100644
index 0000000..6f9f2dc
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/ImportLDIFReader.java
@@ -0,0 +1,273 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2015 ForgeRock AS
+ */
+package org.opends.server.backends.jeb;
+
+import static org.opends.messages.UtilityMessages.*;
+import static org.opends.server.util.StaticUtils.*;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.LocalizableMessageBuilder;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.util.Reject;
+import org.opends.server.api.plugin.PluginResult;
+import org.opends.server.types.AttributeBuilder;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.opends.server.types.LDIFImportConfig;
+import org.opends.server.types.ObjectClass;
+import org.opends.server.util.LDIFException;
+import org.opends.server.util.LDIFReader;
+
+/**
+ * This class specializes the LDIFReader for imports.
+ */
+final class ImportLDIFReader extends LDIFReader
+{
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ private final RootContainer rootContainer;
+
+ /**
+ * Creates a new LDIF reader that will read information from the specified file.
+ *
+ * @param importConfig
+ * The import configuration for this LDIF reader. It must not be <CODE>null</CODE>.
+ * @param rootContainer
+ * The root container needed to get the next entry ID.
+ * @throws IOException
+ * If a problem occurs while opening the LDIF file for reading.
+ */
+ public ImportLDIFReader(LDIFImportConfig importConfig, RootContainer rootContainer) throws IOException
+ {
+ super(importConfig);
+ Reject.ifNull(importConfig, rootContainer);
+ this.rootContainer = rootContainer;
+ }
+
+ /**
+ * Reads the next entry from the LDIF source.
+ *
+ * @return The next entry read from the LDIF source, or <CODE>null</CODE> if the end of the LDIF
+ * data is reached.
+ * @param suffixesMap
+ * A map of suffixes instances.
+ * @param entryInfo
+ * A object to hold information about the entry ID and what suffix was selected.
+ * @throws IOException
+ * If an I/O problem occurs while reading from the file.
+ * @throws LDIFException
+ * If the information read cannot be parsed as an LDIF entry.
+ */
+ public final Entry readEntry(Map<DN, Suffix> suffixesMap, Importer.EntryInformation entryInfo) throws IOException,
+ LDIFException
+ {
+ final boolean checkSchema = importConfig.validateSchema();
+ while (true)
+ {
+ LinkedList<StringBuilder> lines;
+ DN entryDN;
+ EntryID entryID;
+ Suffix suffix;
+ synchronized (this)
+ {
+ // Read the set of lines that make up the next entry.
+ lines = readEntryLines();
+ if (lines == null)
+ {
+ return null;
+ }
+ lastEntryBodyLines = lines;
+ lastEntryHeaderLines = new LinkedList<StringBuilder>();
+
+ // Read the DN of the entry and see if it is one that should be included
+ // in the import.
+ try
+ {
+ entryDN = readDN(lines);
+ }
+ catch (LDIFException e)
+ {
+ logger.traceException(e);
+ continue;
+ }
+
+ if (entryDN == null)
+ {
+ // This should only happen if the LDIF starts with the "version:" line
+ // and has a blank line immediately after that. In that case, simply
+ // read and return the next entry.
+ continue;
+ }
+ else if (!importConfig.includeEntry(entryDN))
+ {
+ logger.trace("Skipping entry %s because the DN is not one that "
+ + "should be included based on the include and exclude branches.", entryDN);
+ entriesRead.incrementAndGet();
+ logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
+ continue;
+ }
+ suffix = Importer.getMatchSuffix(entryDN, suffixesMap);
+ if (suffix == null)
+ {
+ logger.trace("Skipping entry %s because the DN is not one that "
+ + "should be included based on a suffix match check.", entryDN);
+ entriesRead.incrementAndGet();
+ logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
+ continue;
+ }
+ entriesRead.incrementAndGet();
+ entryID = rootContainer.getNextEntryID();
+ suffix.addPending(entryDN);
+ }
+
+ // Create the entry and see if it is one that should be included in the import
+ final Entry entry = createEntry(lines, entryDN, checkSchema, suffix);
+ if (entry == null
+ || !isIncludedInImport(entry, suffix, lines)
+ || !invokeImportPlugins(entry, suffix, lines)
+ || (checkSchema && !isValidAgainstSchema(entry, suffix, lines)))
+ {
+ continue;
+ }
+ entryInfo.setEntryID(entryID);
+ entryInfo.setSuffix(suffix);
+ // The entry should be included in the import, so return it.
+ return entry;
+ }
+ }
+
+ private Entry createEntry(List<StringBuilder> lines, DN entryDN, boolean checkSchema, Suffix suffix)
+ {
+ // Read the set of attributes from the entry.
+ Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>();
+ Map<AttributeType, List<AttributeBuilder>> userAttrBuilders =
+ new HashMap<AttributeType, List<AttributeBuilder>>();
+ Map<AttributeType, List<AttributeBuilder>> operationalAttrBuilders =
+ new HashMap<AttributeType, List<AttributeBuilder>>();
+ try
+ {
+ for (StringBuilder line : lines)
+ {
+ readAttribute(lines, line, entryDN, objectClasses, userAttrBuilders, operationalAttrBuilders, checkSchema);
+ }
+ }
+ catch (LDIFException e)
+ {
+ if (logger.isTraceEnabled())
+ {
+ logger.trace("Skipping entry %s because reading" + "its attributes failed.", entryDN);
+ }
+ logToSkipWriter(lines, ERR_LDIF_READ_ATTR_SKIP.get(entryDN, e.getMessage()));
+ suffix.removePending(entryDN);
+ return null;
+ }
+
+ final Entry entry = new Entry(entryDN, objectClasses,
+ toAttributesMap(userAttrBuilders), toAttributesMap(operationalAttrBuilders));
+ logger.trace("readEntry(), created entry: %s", entry);
+ return entry;
+ }
+
+ private boolean isIncludedInImport(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
+ {
+ final DN entryDN = entry.getName();
+ try
+ {
+ if (!importConfig.includeEntry(entry))
+ {
+ if (logger.isTraceEnabled())
+ {
+ logger.trace("Skipping entry %s because the DN is not one that "
+ + "should be included based on the include and exclude filters.", entryDN);
+ }
+ logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
+ suffix.removePending(entryDN);
+ return false;
+ }
+ }
+ catch (Exception e)
+ {
+ logger.traceException(e);
+ suffix.removePending(entryDN);
+ logToSkipWriter(lines,
+ ERR_LDIF_COULD_NOT_EVALUATE_FILTERS_FOR_IMPORT.get(entry.getName(), lastEntryLineNumber, e));
+ suffix.removePending(entryDN);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean invokeImportPlugins(final Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
+ {
+ if (importConfig.invokeImportPlugins())
+ {
+ PluginResult.ImportLDIF pluginResult = pluginConfigManager.invokeLDIFImportPlugins(importConfig, entry);
+ if (!pluginResult.continueProcessing())
+ {
+ final DN entryDN = entry.getName();
+ LocalizableMessage m;
+ LocalizableMessage rejectMessage = pluginResult.getErrorMessage();
+ if (rejectMessage == null)
+ {
+ m = ERR_LDIF_REJECTED_BY_PLUGIN_NOMESSAGE.get(entryDN);
+ }
+ else
+ {
+ m = ERR_LDIF_REJECTED_BY_PLUGIN.get(entryDN, rejectMessage);
+ }
+
+ logToRejectWriter(lines, m);
+ suffix.removePending(entryDN);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isValidAgainstSchema(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
+ {
+ final DN entryDN = entry.getName();
+ addRDNAttributesIfNecessary(entryDN, entry.getUserAttributes(), entry.getOperationalAttributes());
+ // Add any superior objectclass(s) missing in the objectclass map.
+ addSuperiorObjectClasses(entry.getObjectClasses());
+
+ LocalizableMessageBuilder invalidReason = new LocalizableMessageBuilder();
+ if (!entry.conformsToSchema(null, false, true, false, invalidReason))
+ {
+ LocalizableMessage message = ERR_LDIF_SCHEMA_VIOLATION.get(entryDN, lastEntryLineNumber, invalidReason);
+ logToRejectWriter(lines, message);
+ suffix.removePending(entryDN);
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/Importer.java b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/Importer.java
new file mode 100644
index 0000000..eec3412
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/Importer.java
@@ -0,0 +1,4416 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2008-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011-2015 ForgeRock AS
+ */
+package org.opends.server.backends.jeb;
+
+import static com.sleepycat.je.EnvironmentConfig.*;
+
+import static org.opends.messages.JebMessages.*;
+import static org.opends.server.admin.std.meta.LocalDBIndexCfgDefn.IndexType.*;
+import static org.opends.server.util.DynamicConstants.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.util.StaticUtils.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.LocalizableMessageDescriptor.Arg3;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.spi.IndexingOptions;
+import org.forgerock.util.Utils;
+import org.opends.server.admin.std.meta.LocalDBIndexCfgDefn.IndexType;
+import org.opends.server.admin.std.server.LocalDBBackendCfg;
+import org.opends.server.admin.std.server.LocalDBIndexCfg;
+import org.opends.server.api.DiskSpaceMonitorHandler;
+import org.opends.server.backends.RebuildConfig;
+import org.opends.server.backends.RebuildConfig.RebuildMode;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.extensions.DiskSpaceMonitor;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.types.InitializationException;
+import org.opends.server.types.LDIFImportConfig;
+import org.opends.server.types.LDIFImportResult;
+import org.opends.server.util.Platform;
+import org.opends.server.util.StaticUtils;
+
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.CursorConfig;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.DiskOrderedCursor;
+import com.sleepycat.je.DiskOrderedCursorConfig;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.EnvironmentStats;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.StatsConfig;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.util.PackedInteger;
+
+/**
+ * This class provides the engine that performs both importing of LDIF files and
+ * the rebuilding of indexes.
+ */
+final class Importer implements DiskSpaceMonitorHandler
+{
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ private static final int TIMER_INTERVAL = 10000;
+ private static final int KB = 1024;
+ private static final int MB = KB * KB;
+ private static final String DEFAULT_TMP_DIR = "import-tmp";
+ private static final String TMPENV_DIR = "tmp-env";
+
+ /** Defaults for DB cache. */
+ private static final int MAX_DB_CACHE_SIZE = 8 * MB;
+ private static final int MAX_DB_LOG_SIZE = 10 * MB;
+ private static final int MIN_DB_CACHE_SIZE = 4 * MB;
+
+ /**
+ * Defaults for LDIF reader buffers, min memory required to import and default
+ * size for byte buffers.
+ */
+ private static final int READER_WRITER_BUFFER_SIZE = 8 * KB;
+ private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE
+ + MAX_DB_LOG_SIZE;
+ private static final int BYTE_BUFFER_CAPACITY = 128;
+
+ /** Max size of phase one buffer. */
+ private static final int MAX_BUFFER_SIZE = 2 * MB;
+ /** Min size of phase one buffer. */
+ private static final int MIN_BUFFER_SIZE = 4 * KB;
+ /** Min size of phase two read-ahead cache. */
+ private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB;
+ /** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
+ private static final int SMALL_HEAP_SIZE = 256 * MB;
+
+ /** The DN attribute type. */
+ private static final AttributeType dnType;
+ static final IndexOutputBuffer.IndexComparator indexComparator =
+ new IndexOutputBuffer.IndexComparator();
+
+ /** Phase one buffer count. */
+ private final AtomicInteger bufferCount = new AtomicInteger(0);
+ /** Phase one imported entries count. */
+ private final AtomicLong importCount = new AtomicLong(0);
+
+ /** Phase one buffer size in bytes. */
+ private int bufferSize;
+
+ /** Temp scratch directory. */
+ private final File tempDir;
+
+ /** Index count. */
+ private final int indexCount;
+ /** Thread count. */
+ private int threadCount;
+
+ /** Set to true when validation is skipped. */
+ private final boolean skipDNValidation;
+
+ /** Temporary environment used when DN validation is done in first phase. */
+ private final TmpEnv tmpEnv;
+
+ /** Root container. */
+ private RootContainer rootContainer;
+
+ /** Import configuration. */
+ private final LDIFImportConfig importConfiguration;
+ /** Backend configuration. */
+ private final LocalDBBackendCfg backendConfiguration;
+
+ /** LDIF reader. */
+ private ImportLDIFReader reader;
+
+ /** Migrated entry count. */
+ private int migratedCount;
+
+ /** Size in bytes of temporary env. */
+ private long tmpEnvCacheSize;
+ /** Available memory at the start of the import. */
+ private long availableMemory;
+ /** Size in bytes of DB cache. */
+ private long dbCacheSize;
+
+ /** The executor service used for the buffer sort tasks. */
+ private ExecutorService bufferSortService;
+ /** The executor service used for the scratch file processing tasks. */
+ private ExecutorService scratchFileWriterService;
+
+ /** Queue of free index buffers -- used to re-cycle index buffers. */
+ private final BlockingQueue<IndexOutputBuffer> freeBufferQueue =
+ new LinkedBlockingQueue<IndexOutputBuffer>();
+
+ /**
+ * Map of index keys to index buffers. Used to allocate sorted index buffers
+ * to a index writer thread.
+ */
+ private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap =
+ new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
+
+ /** Map of DB containers to index managers. Used to start phase 2. */
+ private final List<IndexManager> indexMgrList =
+ new LinkedList<IndexManager>();
+ /** Map of DB containers to DN-based index managers. Used to start phase 2. */
+ private final List<IndexManager> DNIndexMgrList =
+ new LinkedList<IndexManager>();
+
+ /**
+ * Futures used to indicate when the index file writers are done flushing
+ * their work queues and have exited. End of phase one.
+ */
+ private final List<Future<Void>> scratchFileWriterFutures;
+ /**
+ * List of index file writer tasks. Used to signal stopScratchFileWriters to
+ * the index file writer tasks when the LDIF file has been done.
+ */
+ private final List<ScratchFileWriterTask> scratchFileWriterList;
+
+ /** Map of DNs to Suffix objects. */
+ private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
+ /** Map of container ids to database containers. */
+ private final ConcurrentHashMap<Integer, Index> idContainerMap = new ConcurrentHashMap<Integer, Index>();
+ /** Map of container ids to entry containers. */
+ private final ConcurrentHashMap<Integer, EntryContainer> idECMap =
+ new ConcurrentHashMap<Integer, EntryContainer>();
+
+ /** Used to synchronize when a scratch file index writer is first setup. */
+ private final Object synObj = new Object();
+
+ /** Rebuild index manager used when rebuilding indexes. */
+ private final RebuildIndexManager rebuildManager;
+
+ /** Set to true if the backend was cleared. */
+ private final boolean clearedBackend;
+
+ /** Used to shutdown import if an error occurs in phase one. */
+ private volatile boolean isCanceled;
+ private volatile boolean isPhaseOneDone;
+
+ /** Number of phase one buffers. */
+ private int phaseOneBufferCount;
+
+ static
+ {
+ AttributeType attrType = DirectoryServer.getAttributeType("dn");
+ if (attrType == null)
+ {
+ attrType = DirectoryServer.getDefaultAttributeType("dn");
+ }
+ dnType = attrType;
+ }
+
+ /**
+ * Create a new import job with the specified rebuild index config.
+ *
+ * @param rebuildConfig
+ * The rebuild index configuration.
+ * @param cfg
+ * The local DB back-end configuration.
+ * @param envConfig
+ * The JEB environment config.
+ * @throws InitializationException
+ * If a problem occurs during initialization.
+ * @throws JebException
+ * If an error occurred when opening the DB.
+ * @throws ConfigException
+ * If a problem occurs during initialization.
+ */
+ public Importer(RebuildConfig rebuildConfig, LocalDBBackendCfg cfg,
+ EnvironmentConfig envConfig) throws InitializationException,
+ JebException, ConfigException
+ {
+ this.importConfiguration = null;
+ this.backendConfiguration = cfg;
+ this.tmpEnv = null;
+ this.threadCount = 1;
+ this.rebuildManager = new RebuildIndexManager(rebuildConfig, cfg);
+ this.indexCount = rebuildManager.getIndexCount();
+ this.clearedBackend = false;
+ this.scratchFileWriterList =
+ new ArrayList<ScratchFileWriterTask>(indexCount);
+ this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>();
+
+ this.tempDir = getTempDir(cfg, rebuildConfig.getTmpDirectory());
+ recursiveDelete(tempDir);
+ if (!tempDir.exists() && !tempDir.mkdirs())
+ {
+ throw new InitializationException(ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
+ }
+ this.skipDNValidation = true;
+ initializeDBEnv(envConfig);
+ }
+
+ /**
+ * Create a new import job with the specified ldif import config.
+ *
+ * @param importConfiguration
+ * The LDIF import configuration.
+ * @param localDBBackendCfg
+ * The local DB back-end configuration.
+ * @param envConfig
+ * The JEB environment config.
+ * @throws InitializationException
+ * If a problem occurs during initialization.
+ * @throws ConfigException
+ * If a problem occurs reading the configuration.
+ * @throws DatabaseException
+ * If an error occurred when opening the DB.
+ */
+ public Importer(LDIFImportConfig importConfiguration,
+ LocalDBBackendCfg localDBBackendCfg, EnvironmentConfig envConfig)
+ throws InitializationException, ConfigException, DatabaseException
+ {
+ this.rebuildManager = null;
+ this.importConfiguration = importConfiguration;
+ this.backendConfiguration = localDBBackendCfg;
+
+ if (importConfiguration.getThreadCount() == 0)
+ {
+ this.threadCount = Runtime.getRuntime().availableProcessors() * 2;
+ }
+ else
+ {
+ this.threadCount = importConfiguration.getThreadCount();
+ }
+
+ // Determine the number of indexes.
+ this.indexCount = getTotalIndexCount(localDBBackendCfg);
+
+ this.clearedBackend = mustClearBackend(importConfiguration, localDBBackendCfg);
+ this.scratchFileWriterList =
+ new ArrayList<ScratchFileWriterTask>(indexCount);
+ this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>();
+
+ this.tempDir = getTempDir(localDBBackendCfg, importConfiguration.getTmpDirectory());
+ recursiveDelete(tempDir);
+ if (!tempDir.exists() && !tempDir.mkdirs())
+ {
+ throw new InitializationException(ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
+ }
+ skipDNValidation = importConfiguration.getSkipDNValidation();
+ initializeDBEnv(envConfig);
+
+ // Set up temporary environment.
+ if (!skipDNValidation)
+ {
+ File envPath = new File(tempDir, TMPENV_DIR);
+ envPath.mkdirs();
+ this.tmpEnv = new TmpEnv(envPath);
+ }
+ else
+ {
+ this.tmpEnv = null;
+ }
+ }
+
+ /**
+ * Returns whether the backend must be cleared.
+ *
+ * @param importCfg the import configuration object
+ * @param backendCfg the backend configuration object
+ * @return true if the backend must be cleared, false otherwise
+ */
+ public static boolean mustClearBackend(LDIFImportConfig importCfg, LocalDBBackendCfg backendCfg)
+ {
+ return !importCfg.appendToExistingData()
+ && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
+ }
+
+ private File getTempDir(LocalDBBackendCfg localDBBackendCfg, String tmpDirectory)
+ {
+ File parentDir;
+ if (tmpDirectory != null)
+ {
+ parentDir = getFileForPath(tmpDirectory);
+ }
+ else
+ {
+ parentDir = getFileForPath(DEFAULT_TMP_DIR);
+ }
+ return new File(parentDir, localDBBackendCfg.getBackendId());
+ }
+
+ private int getTotalIndexCount(LocalDBBackendCfg localDBBackendCfg)
+ throws ConfigException
+ {
+ int indexes = 2; // dn2id, dn2uri
+ for (String indexName : localDBBackendCfg.listLocalDBIndexes())
+ {
+ LocalDBIndexCfg index = localDBBackendCfg.getLocalDBIndex(indexName);
+ SortedSet<IndexType> types = index.getIndexType();
+ if (types.contains(IndexType.EXTENSIBLE))
+ {
+ indexes += types.size() - 1 + index.getIndexExtensibleMatchingRule().size();
+ }
+ else
+ {
+ indexes += types.size();
+ }
+ }
+ return indexes;
+ }
+
+ /**
+ * Return the suffix instance in the specified map that matches the specified
+ * DN.
+ *
+ * @param dn
+ * The DN to search for.
+ * @param map
+ * The map to search.
+ * @return The suffix instance that matches the DN, or null if no match is
+ * found.
+ */
+ public static Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map)
+ {
+ Suffix suffix = null;
+ DN nodeDN = dn;
+
+ while (suffix == null && nodeDN != null)
+ {
+ suffix = map.get(nodeDN);
+ if (suffix == null)
+ {
+ nodeDN = nodeDN.getParentDNInSuffix();
+ }
+ }
+ return suffix;
+ }
+
+ /**
+ * Calculate buffer sizes and initialize JEB properties based on memory.
+ *
+ * @param envConfig
+ * The environment config to use in the calculations.
+ * @throws InitializationException
+ * If a problem occurs during calculation.
+ */
+ private void initializeDBEnv(EnvironmentConfig envConfig) throws InitializationException
+ {
+ // Calculate amount of usable memory. This will need to take into account
+ // various fudge factors, including the number of IO buffers used by the
+ // scratch writers (1 per index).
+ calculateAvailableMemory();
+
+ final long usableMemory = availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
+
+ // We need caching when doing DN validation or rebuilding indexes.
+ if (!skipDNValidation || rebuildManager != null)
+ {
+ // No DN validation: calculate memory for DB cache, DN2ID temporary cache,
+ // and buffers.
+ if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
+ {
+ dbCacheSize = 500 * KB;
+ tmpEnvCacheSize = 500 * KB;
+ }
+ else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
+ {
+ dbCacheSize = MIN_DB_CACHE_SIZE;
+ tmpEnvCacheSize = MIN_DB_CACHE_SIZE;
+ }
+ else if (!clearedBackend)
+ {
+ // Appending to existing data so reserve extra memory for the DB cache
+ // since it will be needed for dn2id queries.
+ dbCacheSize = usableMemory * 33 / 100;
+ tmpEnvCacheSize = usableMemory * 33 / 100;
+ }
+ else
+ {
+ dbCacheSize = MAX_DB_CACHE_SIZE;
+ tmpEnvCacheSize = usableMemory * 66 / 100;
+ }
+ }
+ else
+ {
+ // No DN validation: calculate memory for DB cache and buffers.
+
+ // No need for DN2ID cache.
+ tmpEnvCacheSize = 0;
+
+ if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
+ {
+ dbCacheSize = 500 * KB;
+ }
+ else if (usableMemory < MIN_DB_CACHE_MEMORY)
+ {
+ dbCacheSize = MIN_DB_CACHE_SIZE;
+ }
+ else
+ {
+ // No need to differentiate between append/clear backend, since dn2id is
+ // not being queried.
+ dbCacheSize = MAX_DB_CACHE_SIZE;
+ }
+ }
+
+ final long phaseOneBufferMemory = usableMemory - dbCacheSize - tmpEnvCacheSize;
+ final int oldThreadCount = threadCount;
+ if (indexCount != 0) // Avoid / by zero
+ {
+ while (true)
+ {
+ phaseOneBufferCount = 2 * indexCount * threadCount;
+
+ // Scratch writers allocate 4 buffers per index as well.
+ final int totalPhaseOneBufferCount = phaseOneBufferCount + (4 * indexCount);
+ long longBufferSize = phaseOneBufferMemory / totalPhaseOneBufferCount;
+ // We need (2 * bufferSize) to fit in an int for the insertByteStream
+ // and deleteByteStream constructors.
+ bufferSize = (int) Math.min(longBufferSize, Integer.MAX_VALUE / 2);
+
+ if (bufferSize > MAX_BUFFER_SIZE)
+ {
+ if (!skipDNValidation)
+ {
+ // The buffers are big enough: the memory is best used for the DN2ID temp DB
+ bufferSize = MAX_BUFFER_SIZE;
+
+ final long extraMemory = phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
+ if (!clearedBackend)
+ {
+ dbCacheSize += extraMemory / 2;
+ tmpEnvCacheSize += extraMemory / 2;
+ }
+ else
+ {
+ tmpEnvCacheSize += extraMemory;
+ }
+ }
+
+ break;
+ }
+ else if (bufferSize > MIN_BUFFER_SIZE)
+ {
+ // This is acceptable.
+ break;
+ }
+ else if (threadCount > 1)
+ {
+ // Retry using less threads.
+ threadCount--;
+ }
+ else
+ {
+ // Not enough memory.
+ final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
+ LocalizableMessage message =
+ ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory,
+ minimumPhaseOneBufferMemory + dbCacheSize + tmpEnvCacheSize);
+ throw new InitializationException(message);
+ }
+ }
+ }
+
+ if (oldThreadCount != threadCount)
+ {
+ logger.info(NOTE_JEB_IMPORT_ADJUST_THREAD_COUNT, oldThreadCount, threadCount);
+ }
+
+ logger.info(NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, phaseOneBufferCount);
+ if (tmpEnvCacheSize > 0)
+ {
+ logger.info(NOTE_JEB_IMPORT_LDIF_TMP_ENV_MEM, tmpEnvCacheSize);
+ }
+ envConfig.setConfigParam(MAX_MEMORY, Long.toString(dbCacheSize));
+ logger.info(NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO, dbCacheSize, bufferSize);
+ }
+
+ /**
+ * Calculates the amount of available memory which can be used by this import,
+ * taking into account whether or not the import is running offline or online
+ * as a task.
+ */
+ private void calculateAvailableMemory()
+ {
+ final long totalAvailableMemory;
+ if (DirectoryServer.isRunning())
+ {
+ // Online import/rebuild.
+ Runtime runTime = Runtime.getRuntime();
+ // call twice gc to ensure finalizers are called
+ // and young to old gen references are properly gc'd
+ runTime.gc();
+ runTime.gc();
+ final long usedMemory = runTime.totalMemory() - runTime.freeMemory();
+ final long maxUsableMemory = Platform.getUsableMemoryForCaching();
+ final long usableMemory = maxUsableMemory - usedMemory;
+
+ final long configuredMemory;
+ if (backendConfiguration.getDBCacheSize() > 0)
+ {
+ configuredMemory = backendConfiguration.getDBCacheSize();
+ }
+ else
+ {
+ configuredMemory = backendConfiguration.getDBCachePercent() * Runtime.getRuntime().maxMemory() / 100;
+ }
+
+ // Round up to minimum of 16MB (e.g. unit tests only use 2% cache).
+ totalAvailableMemory = Math.max(Math.min(usableMemory, configuredMemory), 16 * MB);
+ }
+ else
+ {
+ // Offline import/rebuild.
+ totalAvailableMemory = Platform.getUsableMemoryForCaching();
+ }
+
+ // Now take into account various fudge factors.
+ int importMemPct = 90;
+ if (totalAvailableMemory <= SMALL_HEAP_SIZE)
+ {
+ // Be pessimistic when memory is low.
+ importMemPct -= 25;
+ }
+ if (rebuildManager != null)
+ {
+ // Rebuild seems to require more overhead.
+ importMemPct -= 15;
+ }
+
+ availableMemory = totalAvailableMemory * importMemPct / 100;
+ }
+
+ private void initializeIndexBuffers()
+ {
+ for (int i = 0; i < phaseOneBufferCount; i++)
+ {
+ freeBufferQueue.add(new IndexOutputBuffer(bufferSize));
+ }
+ }
+
+ private void initializeSuffixes() throws DatabaseException, ConfigException,
+ InitializationException
+ {
+ for (EntryContainer ec : rootContainer.getEntryContainers())
+ {
+ Suffix suffix = getSuffix(ec);
+ if (suffix != null)
+ {
+ dnSuffixMap.put(ec.getBaseDN(), suffix);
+ generateIndexID(suffix);
+ }
+ }
+ }
+
+ /**
+ * Mainly used to support multiple suffixes. Each index in each suffix gets an
+ * unique ID to identify which DB it needs to go to in phase two processing.
+ */
+ private void generateIndexID(Suffix suffix)
+ {
+ for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
+ {
+ AttributeIndex attributeIndex = mapEntry.getValue();
+ putInIdContainerMap(attributeIndex.getEqualityIndex());
+ putInIdContainerMap(attributeIndex.getPresenceIndex());
+ putInIdContainerMap(attributeIndex.getSubstringIndex());
+ putInIdContainerMap(attributeIndex.getOrderingIndex());
+ putInIdContainerMap(attributeIndex.getApproximateIndex());
+ Map<String, Collection<Index>> extensibleMap = attributeIndex.getExtensibleIndexes();
+ if (!extensibleMap.isEmpty())
+ {
+ putInIdContainerMap(extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING));
+ putInIdContainerMap(extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED));
+ }
+ }
+ }
+
+ private void putInIdContainerMap(Collection<Index> indexes)
+ {
+ if (indexes != null)
+ {
+ for (Index index : indexes)
+ {
+ putInIdContainerMap(index);
+ }
+ }
+ }
+
+ private void putInIdContainerMap(Index index)
+ {
+ if (index != null)
+ {
+ int id = System.identityHashCode(index);
+ idContainerMap.putIfAbsent(id, index);
+ }
+ }
+
+ private Suffix getSuffix(EntryContainer entryContainer)
+ throws ConfigException, InitializationException
+ {
+ DN baseDN = entryContainer.getBaseDN();
+ EntryContainer sourceEntryContainer = null;
+ List<DN> includeBranches = new ArrayList<DN>();
+ List<DN> excludeBranches = new ArrayList<DN>();
+
+ if (!importConfiguration.appendToExistingData()
+ && !importConfiguration.clearBackend())
+ {
+ for (DN dn : importConfiguration.getExcludeBranches())
+ {
+ if (baseDN.equals(dn))
+ {
+ // This entire base DN was explicitly excluded. Skip.
+ return null;
+ }
+ if (baseDN.isAncestorOf(dn))
+ {
+ excludeBranches.add(dn);
+ }
+ }
+
+ if (!importConfiguration.getIncludeBranches().isEmpty())
+ {
+ for (DN dn : importConfiguration.getIncludeBranches())
+ {
+ if (baseDN.isAncestorOf(dn))
+ {
+ includeBranches.add(dn);
+ }
+ }
+
+ if (includeBranches.isEmpty())
+ {
+ /*
+ * There are no branches in the explicitly defined include list under
+ * this base DN. Skip this base DN all together.
+ */
+ return null;
+ }
+
+ // Remove any overlapping include branches.
+ Iterator<DN> includeBranchIterator = includeBranches.iterator();
+ while (includeBranchIterator.hasNext())
+ {
+ DN includeDN = includeBranchIterator.next();
+ if (!isAnyNotEqualAndAncestorOf(includeBranches, includeDN))
+ {
+ includeBranchIterator.remove();
+ }
+ }
+
+ // Remove any exclude branches that are not are not under a include
+ // branch since they will be migrated as part of the existing entries
+ // outside of the include branches anyways.
+ Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
+ while (excludeBranchIterator.hasNext())
+ {
+ DN excludeDN = excludeBranchIterator.next();
+ if (!isAnyAncestorOf(includeBranches, excludeDN))
+ {
+ excludeBranchIterator.remove();
+ }
+ }
+
+ if (excludeBranches.isEmpty()
+ && includeBranches.size() == 1
+ && includeBranches.get(0).equals(baseDN))
+ {
+ // This entire base DN is explicitly included in the import with
+ // no exclude branches that we need to migrate. Just clear the entry
+ // container.
+ clearSuffix(entryContainer);
+ }
+ else
+ {
+ // Create a temp entry container
+ sourceEntryContainer = entryContainer;
+ entryContainer = rootContainer.openEntryContainer(baseDN, baseDN.toIrreversibleReadableString()
+ + "_importTmp");
+ }
+ }
+ }
+ return new Suffix(entryContainer, sourceEntryContainer, includeBranches, excludeBranches);
+ }
+
+ private void clearSuffix(EntryContainer entryContainer)
+ {
+ entryContainer.lock();
+ entryContainer.clear();
+ entryContainer.unlock();
+ }
+
+ private boolean isAnyNotEqualAndAncestorOf(List<DN> dns, DN childDN)
+ {
+ for (DN dn : dns)
+ {
+ if (!dn.equals(childDN) && dn.isAncestorOf(childDN))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isAnyAncestorOf(List<DN> dns, DN childDN)
+ {
+ for (DN dn : dns)
+ {
+ if (dn.isAncestorOf(childDN))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Rebuild the indexes using the specified root container.
+ *
+ * @param rootContainer
+ * The root container to rebuild indexes in.
+ * @throws ConfigException
+ * If a configuration error occurred.
+ * @throws InitializationException
+ * If an initialization error occurred.
+ * @throws JebException
+ * If the JEB database had an error.
+ * @throws InterruptedException
+ * If an interrupted error occurred.
+ * @throws ExecutionException
+ * If an execution error occurred.
+ */
+ public void rebuildIndexes(RootContainer rootContainer)
+ throws ConfigException, InitializationException, JebException,
+ InterruptedException, ExecutionException
+ {
+ this.rootContainer = rootContainer;
+ long startTime = System.currentTimeMillis();
+
+ DiskSpaceMonitor tmpMonitor = createDiskSpaceMonitor(tempDir, "backend index rebuild tmp directory");
+ tmpMonitor.initializeMonitorProvider(null);
+ DirectoryServer.registerMonitorProvider(tmpMonitor);
+ File parentDirectory = getFileForPath(backendConfiguration.getDBDirectory());
+ File backendDirectory = new File(parentDirectory, backendConfiguration.getBackendId());
+ DiskSpaceMonitor dbMonitor = createDiskSpaceMonitor(backendDirectory, "backend index rebuild DB directory");
+ dbMonitor.initializeMonitorProvider(null);
+ DirectoryServer.registerMonitorProvider(dbMonitor);
+
+ try
+ {
+ rebuildManager.initialize();
+ rebuildManager.printStartMessage();
+ rebuildManager.rebuildIndexes();
+ recursiveDelete(tempDir);
+ rebuildManager.printStopMessage(startTime);
+ }
+ finally
+ {
+ DirectoryServer.deregisterMonitorProvider(tmpMonitor);
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
+ tmpMonitor.finalizeMonitorProvider();
+ dbMonitor.finalizeMonitorProvider();
+ }
+ }
+
+ /**
+ * Import a LDIF using the specified root container.
+ *
+ * @param rootContainer
+ * The root container to use during the import.
+ * @return A LDIF result.
+ * @throws ConfigException
+ * If the import failed because of an configuration error.
+ * @throws InitializationException
+ * If the import failed because of an initialization error.
+ * @throws JebException
+ * If the import failed due to a database error.
+ * @throws InterruptedException
+ * If the import failed due to an interrupted error.
+ * @throws ExecutionException
+ * If the import failed due to an execution error.
+ */
+ public LDIFImportResult processImport(RootContainer rootContainer)
+ throws ConfigException, InitializationException, JebException,
+ InterruptedException, ExecutionException
+ {
+ this.rootContainer = rootContainer;
+ DiskSpaceMonitor tmpMonitor = null;
+ DiskSpaceMonitor dbMonitor = null;
+ try {
+ try
+ {
+ reader = new ImportLDIFReader(importConfiguration, rootContainer);
+ }
+ catch (IOException ioe)
+ {
+ LocalizableMessage message = ERR_JEB_IMPORT_LDIF_READER_IO_ERROR.get();
+ throw new InitializationException(message, ioe);
+ }
+
+ tmpMonitor = createDiskSpaceMonitor(tempDir, "backend import tmp directory");
+ tmpMonitor.initializeMonitorProvider(null);
+ DirectoryServer.registerMonitorProvider(tmpMonitor);
+ File parentDirectory = getFileForPath(backendConfiguration.getDBDirectory());
+ File backendDirectory = new File(parentDirectory, backendConfiguration.getBackendId());
+ dbMonitor = createDiskSpaceMonitor(backendDirectory, "backend import DB directory");
+ dbMonitor.initializeMonitorProvider(null);
+ DirectoryServer.registerMonitorProvider(dbMonitor);
+
+ logger.info(NOTE_JEB_IMPORT_STARTING, DirectoryServer.getVersionString(),
+ BUILD_ID, REVISION_NUMBER);
+ logger.info(NOTE_JEB_IMPORT_THREAD_COUNT, threadCount);
+ initializeSuffixes();
+ setIndexesTrusted(false);
+
+ final long startTime = System.currentTimeMillis();
+ phaseOne();
+ isPhaseOneDone = true;
+ final long phaseOneFinishTime = System.currentTimeMillis();
+
+ if (!skipDNValidation)
+ {
+ tmpEnv.shutdown();
+ }
+ if (isCanceled)
+ {
+ throw new InterruptedException("Import processing canceled.");
+ }
+
+ final long phaseTwoTime = System.currentTimeMillis();
+ phaseTwo();
+ if (isCanceled)
+ {
+ throw new InterruptedException("Import processing canceled.");
+ }
+ final long phaseTwoFinishTime = System.currentTimeMillis();
+
+ setIndexesTrusted(true);
+ switchEntryContainers();
+ recursiveDelete(tempDir);
+ final long finishTime = System.currentTimeMillis();
+ final long importTime = finishTime - startTime;
+ logger.info(NOTE_JEB_IMPORT_PHASE_STATS, importTime / 1000,
+ (phaseOneFinishTime - startTime) / 1000,
+ (phaseTwoFinishTime - phaseTwoTime) / 1000);
+ float rate = 0;
+ if (importTime > 0)
+ {
+ rate = 1000f * reader.getEntriesRead() / importTime;
+ }
+ logger.info(NOTE_JEB_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount.get(),
+ reader.getEntriesIgnored(), reader.getEntriesRejected(),
+ migratedCount, importTime / 1000, rate);
+ return new LDIFImportResult(reader.getEntriesRead(),
+ reader.getEntriesRejected(), reader.getEntriesIgnored());
+ }
+ finally
+ {
+ close(reader);
+ if (!skipDNValidation)
+ {
+ try
+ {
+ tmpEnv.shutdown();
+ }
+ catch (Exception ignored)
+ {
+ // Do nothing.
+ }
+ }
+ if (tmpMonitor != null)
+ {
+ DirectoryServer.deregisterMonitorProvider(tmpMonitor);
+ tmpMonitor.finalizeMonitorProvider();
+ }
+ if (dbMonitor != null)
+ {
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
+ dbMonitor.finalizeMonitorProvider();
+ }
+ }
+ }
+
+ private DiskSpaceMonitor createDiskSpaceMonitor(File dir, String backendSuffix)
+ {
+ final LocalDBBackendCfg cfg = backendConfiguration;
+ return new DiskSpaceMonitor(cfg.getBackendId() + " " + backendSuffix, dir,
+ cfg.getDiskLowThreshold(), cfg.getDiskFullThreshold(), 5, TimeUnit.SECONDS, this);
+ }
+
+ private void recursiveDelete(File dir)
+ {
+ if (dir.listFiles() != null)
+ {
+ for (File f : dir.listFiles())
+ {
+ if (f.isDirectory())
+ {
+ recursiveDelete(f);
+ }
+ f.delete();
+ }
+ }
+ dir.delete();
+ }
+
+ private void switchEntryContainers() throws DatabaseException, JebException, InitializationException
+ {
+ for (Suffix suffix : dnSuffixMap.values())
+ {
+ DN baseDN = suffix.getBaseDN();
+ EntryContainer entryContainer = suffix.getSrcEntryContainer();
+ if (entryContainer != null)
+ {
+ final EntryContainer toDelete = rootContainer.unregisterEntryContainer(baseDN);
+ toDelete.lock();
+ toDelete.close();
+ toDelete.delete();
+ toDelete.unlock();
+
+ final EntryContainer replacement = suffix.getEntryContainer();
+ replacement.lock();
+ replacement.setDatabasePrefix(baseDN.toIrreversibleReadableString());
+ replacement.unlock();
+ rootContainer.registerEntryContainer(baseDN, replacement);
+ }
+ }
+ }
+
+ private void setIndexesTrusted(boolean trusted) throws JebException
+ {
+ try
+ {
+ for (Suffix s : dnSuffixMap.values())
+ {
+ s.setIndexesTrusted(trusted);
+ }
+ }
+ catch (DatabaseException ex)
+ {
+ throw new JebException(NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()));
+ }
+ }
+
+ private void phaseOne() throws InterruptedException, ExecutionException
+ {
+ initializeIndexBuffers();
+
+ final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+ scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
+ scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
+ bufferSortService = Executors.newFixedThreadPool(threadCount);
+ final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
+
+ final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
+ tasks.add(new MigrateExistingTask());
+ getAll(execService.invokeAll(tasks));
+ tasks.clear();
+
+ if (importConfiguration.appendToExistingData()
+ && importConfiguration.replaceExistingEntries())
+ {
+ for (int i = 0; i < threadCount; i++)
+ {
+ tasks.add(new AppendReplaceTask());
+ }
+ }
+ else
+ {
+ for (int i = 0; i < threadCount; i++)
+ {
+ tasks.add(new ImportTask());
+ }
+ }
+ getAll(execService.invokeAll(tasks));
+ tasks.clear();
+
+ tasks.add(new MigrateExcludedTask());
+ getAll(execService.invokeAll(tasks));
+
+ stopScratchFileWriters();
+ getAll(scratchFileWriterFutures);
+
+ shutdownAll(timerService, execService, bufferSortService, scratchFileWriterService);
+
+ // Try to clear as much memory as possible.
+ clearAll(scratchFileWriterList, scratchFileWriterFutures, freeBufferQueue);
+ indexKeyQueueMap.clear();
+ }
+
+ private void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
+ {
+ timerService.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ private void shutdownAll(ExecutorService... executorServices) throws InterruptedException
+ {
+ for (ExecutorService executorService : executorServices)
+ {
+ executorService.shutdown();
+ }
+ for (ExecutorService executorService : executorServices)
+ {
+ executorService.awaitTermination(30, TimeUnit.SECONDS);
+ }
+ }
+
+ private void clearAll(Collection<?>... cols)
+ {
+ for (Collection<?> col : cols)
+ {
+ col.clear();
+ }
+ }
+
+ private void phaseTwo() throws InterruptedException, ExecutionException
+ {
+ ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+ scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
+ try
+ {
+ processIndexFiles();
+ }
+ finally
+ {
+ shutdownAll(timerService);
+ }
+ }
+
+ private void processIndexFiles() throws InterruptedException, ExecutionException
+ {
+ if (bufferCount.get() == 0)
+ {
+ return;
+ }
+ int dbThreads = Runtime.getRuntime().availableProcessors();
+ if (dbThreads < 4)
+ {
+ dbThreads = 4;
+ }
+
+ // Calculate memory / buffer counts.
+ final long usableMemory = availableMemory - dbCacheSize;
+ int readAheadSize;
+ int buffers;
+ while (true)
+ {
+ final List<IndexManager> allIndexMgrs = new ArrayList<IndexManager>(DNIndexMgrList);
+ allIndexMgrs.addAll(indexMgrList);
+ Collections.sort(allIndexMgrs, Collections.reverseOrder());
+
+ buffers = 0;
+ final int limit = Math.min(dbThreads, allIndexMgrs.size());
+ for (int i = 0; i < limit; i++)
+ {
+ buffers += allIndexMgrs.get(i).numberOfBuffers;
+ }
+
+ readAheadSize = (int) (usableMemory / buffers);
+ if (readAheadSize > bufferSize)
+ {
+ // Cache size is never larger than the buffer size.
+ readAheadSize = bufferSize;
+ break;
+ }
+ else if (readAheadSize > MIN_READ_AHEAD_CACHE_SIZE)
+ {
+ // This is acceptable.
+ break;
+ }
+ else if (dbThreads > 1)
+ {
+ // Reduce thread count.
+ dbThreads--;
+ }
+ else
+ {
+ // Not enough memory - will need to do batching for the biggest indexes.
+ readAheadSize = MIN_READ_AHEAD_CACHE_SIZE;
+ buffers = (int) (usableMemory / readAheadSize);
+
+ logger.warn(WARN_IMPORT_LDIF_LACK_MEM_PHASE_TWO, usableMemory);
+ break;
+ }
+ }
+
+ // Ensure that there are minimum two threads available for parallel
+ // processing of smaller indexes.
+ dbThreads = Math.max(2, dbThreads);
+
+ logger.info(NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT, availableMemory, readAheadSize, buffers);
+
+ // Start indexing tasks.
+ List<Future<Void>> futures = new LinkedList<Future<Void>>();
+ ExecutorService dbService = Executors.newFixedThreadPool(dbThreads);
+ Semaphore permits = new Semaphore(buffers);
+
+ // Start DN processing first.
+ submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
+ submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
+ getAll(futures);
+ shutdownAll(dbService);
+ }
+
+ private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService, Semaphore permits,
+ int buffers, int readAheadSize, List<Future<Void>> futures)
+ {
+ for (IndexManager indexMgr : indexMgrs)
+ {
+ futures.add(dbService.submit(new IndexDBWriteTask(indexMgr, permits, buffers, readAheadSize)));
+ }
+ }
+
+ private <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException
+ {
+ for (Future<?> result : futures)
+ {
+ result.get();
+ }
+ }
+
+ private void stopScratchFileWriters()
+ {
+ final IndexOutputBuffer stopProcessing = IndexOutputBuffer.poison();
+ for (ScratchFileWriterTask task : scratchFileWriterList)
+ {
+ task.queue.add(stopProcessing);
+ }
+ }
+
+ /** Task used to migrate excluded branch. */
+ private final class MigrateExcludedTask extends ImportTask
+ {
+
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception
+ {
+ for (Suffix suffix : dnSuffixMap.values())
+ {
+ EntryContainer entryContainer = suffix.getSrcEntryContainer();
+ if (entryContainer != null && !suffix.getExcludeBranches().isEmpty())
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ LockMode lockMode = LockMode.DEFAULT;
+ logger.info(NOTE_JEB_IMPORT_MIGRATION_START, "excluded", suffix.getBaseDN());
+ Cursor cursor = entryContainer.getDN2ID().openCursor(null, CursorConfig.READ_COMMITTED);
+ Comparator<byte[]> comparator = entryContainer.getDN2ID().getComparator();
+ try
+ {
+ for (DN excludedDN : suffix.getExcludeBranches())
+ {
+ byte[] bytes = JebFormat.dnToDNKey(excludedDN, suffix.getBaseDN().size());
+ key.setData(bytes);
+ OperationStatus status = cursor.getSearchKeyRange(key, data, lockMode);
+ if (status == OperationStatus.SUCCESS
+ && Arrays.equals(key.getData(), bytes))
+ {
+ // This is the base entry for a branch that was excluded in the
+ // import so we must migrate all entries in this branch over to
+ // the new entry container.
+ byte[] end = Arrays.copyOf(bytes, bytes.length + 1);
+ end[end.length - 1] = 0x01;
+
+ while (status == OperationStatus.SUCCESS
+ && comparator.compare(key.getData(), end) < 0
+ && !importConfiguration.isCancelled() && !isCanceled)
+ {
+ EntryID id = new EntryID(data);
+ Entry entry = entryContainer.getID2Entry().get(null, id, LockMode.DEFAULT);
+ processEntry(entry, rootContainer.getNextEntryID(), suffix);
+ migratedCount++;
+ status = cursor.getNext(key, data, lockMode);
+ }
+ }
+ }
+ flushIndexBuffers();
+ }
+ catch (Exception e)
+ {
+ logger.error(ERR_JEB_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR, e.getMessage());
+ isCanceled = true;
+ throw e;
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+ }
+ return null;
+ }
+ }
+
+ /** Task to migrate existing entries. */
+ private final class MigrateExistingTask extends ImportTask
+ {
+
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception
+ {
+ for (Suffix suffix : dnSuffixMap.values())
+ {
+ EntryContainer entryContainer = suffix.getSrcEntryContainer();
+ if (entryContainer != null && !suffix.getIncludeBranches().isEmpty())
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ LockMode lockMode = LockMode.DEFAULT;
+ logger.info(NOTE_JEB_IMPORT_MIGRATION_START, "existing", suffix.getBaseDN());
+ Cursor cursor = entryContainer.getDN2ID().openCursor(null, null);
+ try
+ {
+ final List<byte[]> includeBranches = includeBranchesAsBytes(suffix);
+ OperationStatus status = cursor.getFirst(key, data, lockMode);
+ while (status == OperationStatus.SUCCESS
+ && !importConfiguration.isCancelled() && !isCanceled)
+ {
+ if (!find(includeBranches, key.getData()))
+ {
+ EntryID id = new EntryID(data);
+ Entry entry = entryContainer.getID2Entry().get(null, id, LockMode.DEFAULT);
+ processEntry(entry, rootContainer.getNextEntryID(), suffix);
+ migratedCount++;
+ status = cursor.getNext(key, data, lockMode);
+ }
+ else
+ {
+ // This is the base entry for a branch that will be included
+ // in the import so we don't want to copy the branch to the
+ // new entry container.
+
+ /**
+ * Advance the cursor to next entry at the same level in the DIT
+ * skipping all the entries in this branch. Set the next
+ * starting value to a value of equal length but slightly
+ * greater than the previous DN. Since keys are compared in
+ * reverse order we must set the first byte (the comma). No
+ * possibility of overflow here.
+ */
+ byte[] begin = Arrays.copyOf(key.getData(), key.getSize() + 1);
+ begin[begin.length - 1] = 0x01;
+ key.setData(begin);
+ status = cursor.getSearchKeyRange(key, data, lockMode);
+ }
+ }
+ flushIndexBuffers();
+ }
+ catch (Exception e)
+ {
+ logger.error(ERR_JEB_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR, e.getMessage());
+ isCanceled = true;
+ throw e;
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+ }
+ return null;
+ }
+
+ private List<byte[]> includeBranchesAsBytes(Suffix suffix)
+ {
+ List<byte[]> includeBranches = new ArrayList<byte[]>(suffix.getIncludeBranches().size());
+ for (DN includeBranch : suffix.getIncludeBranches())
+ {
+ if (includeBranch.isDescendantOf(suffix.getBaseDN()))
+ {
+ includeBranches.add(JebFormat.dnToDNKey(includeBranch, suffix.getBaseDN().size()));
+ }
+ }
+ return includeBranches;
+ }
+
+ private boolean find(List<byte[]> arrays, byte[] arrayToFind)
+ {
+ for (byte[] array : arrays)
+ {
+ if (Arrays.equals(array, arrayToFind))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Task to perform append/replace processing.
+ */
+ private class AppendReplaceTask extends ImportTask
+ {
+ private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
+ private final Set<ByteString> deleteKeySet = new HashSet<ByteString>();
+ private final EntryInformation entryInfo = new EntryInformation();
+ private Entry oldEntry;
+ private EntryID entryID;
+
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ while (true)
+ {
+ if (importConfiguration.isCancelled() || isCanceled)
+ {
+ freeBufferQueue.add(IndexOutputBuffer.poison());
+ return null;
+ }
+ oldEntry = null;
+ Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
+ if (entry == null)
+ {
+ break;
+ }
+ entryID = entryInfo.getEntryID();
+ Suffix suffix = entryInfo.getSuffix();
+ processEntry(entry, suffix);
+ }
+ flushIndexBuffers();
+ }
+ catch (Exception e)
+ {
+ logger.error(ERR_JEB_IMPORT_LDIF_APPEND_REPLACE_TASK_ERR, e.getMessage());
+ isCanceled = true;
+ throw e;
+ }
+ return null;
+ }
+
+ void processEntry(Entry entry, Suffix suffix)
+ throws DatabaseException, DirectoryException, JebException, InterruptedException
+ {
+ DN entryDN = entry.getName();
+ DN2ID dn2id = suffix.getDN2ID();
+ EntryID oldID = dn2id.get(null, entryDN, LockMode.DEFAULT);
+ if (oldID != null)
+ {
+ oldEntry = suffix.getID2Entry().get(null, oldID, LockMode.DEFAULT);
+ }
+ if (oldEntry == null)
+ {
+ if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
+ {
+ suffix.removePending(entryDN);
+ return;
+ }
+ suffix.removePending(entryDN);
+ processDN2ID(suffix, entryDN, entryID);
+ }
+ else
+ {
+ suffix.removePending(entryDN);
+ entryID = oldID;
+ }
+ processDN2URI(suffix, oldEntry, entry);
+ suffix.getID2Entry().put(null, entryID, entry);
+ if (oldEntry != null)
+ {
+ processAllIndexes(suffix, entry, entryID);
+ }
+ else
+ {
+ processIndexes(suffix, entry, entryID);
+ }
+ importCount.getAndIncrement();
+ }
+
+ void processAllIndexes(Suffix suffix, Entry entry, EntryID entryID)
+ throws DatabaseException, DirectoryException, JebException, InterruptedException
+ {
+ for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
+ {
+ AttributeType attributeType = mapEntry.getKey();
+ fillIndexKey(suffix, mapEntry, entry, attributeType, entryID);
+ }
+ }
+
+ @Override
+ void processAttribute(Index index, Entry entry, EntryID entryID, IndexingOptions options,
+ IndexKey indexKey) throws DatabaseException, InterruptedException
+ {
+ if (oldEntry != null)
+ {
+ deleteKeySet.clear();
+ index.indexer.indexEntry(oldEntry, deleteKeySet, options);
+ for (ByteString delKey : deleteKeySet)
+ {
+ processKey(index, delKey.toByteArray(), entryID, indexComparator, indexKey, false);
+ }
+ }
+ insertKeySet.clear();
+ index.indexer.indexEntry(entry, insertKeySet, options);
+ for (ByteString key : insertKeySet)
+ {
+ processKey(index, key.toByteArray(), entryID, indexComparator, indexKey, true);
+ }
+ }
+ }
+
+ /**
+ * This task performs phase reading and processing of the entries read from
+ * the LDIF file(s). This task is used if the append flag wasn't specified.
+ */
+ private class ImportTask implements Callable<Void>
+ {
+ private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>();
+ private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
+ private final EntryInformation entryInfo = new EntryInformation();
+ private DatabaseEntry keyEntry = new DatabaseEntry();
+ private DatabaseEntry valEntry = new DatabaseEntry();
+
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ while (true)
+ {
+ if (importConfiguration.isCancelled() || isCanceled)
+ {
+ freeBufferQueue.add(IndexOutputBuffer.poison());
+ return null;
+ }
+ Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
+ if (entry == null)
+ {
+ break;
+ }
+ EntryID entryID = entryInfo.getEntryID();
+ Suffix suffix = entryInfo.getSuffix();
+ processEntry(entry, entryID, suffix);
+ }
+ flushIndexBuffers();
+ }
+ catch (Exception e)
+ {
+ logger.error(ERR_JEB_IMPORT_LDIF_IMPORT_TASK_ERR, e.getMessage());
+ isCanceled = true;
+ throw e;
+ }
+ return null;
+ }
+
+ void processEntry(Entry entry, EntryID entryID, Suffix suffix)
+ throws DatabaseException, DirectoryException, JebException, InterruptedException
+ {
+ DN entryDN = entry.getName();
+ if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
+ {
+ suffix.removePending(entryDN);
+ return;
+ }
+ suffix.removePending(entryDN);
+ processDN2ID(suffix, entryDN, entryID);
+ processDN2URI(suffix, null, entry);
+ processIndexes(suffix, entry, entryID);
+ suffix.getID2Entry().put(null, entryID, entry);
+ importCount.getAndIncrement();
+ }
+
+ /** Examine the DN for duplicates and missing parents. */
+ boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
+ throws JebException, InterruptedException
+ {
+ //Perform parent checking.
+ DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN);
+ if (parentDN != null && !suffix.isParentProcessed(parentDN, tmpEnv, clearedBackend))
+ {
+ reader.rejectEntry(entry, ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN));
+ return false;
+ }
+ //If the backend was not cleared, then the dn2id needs to checked first
+ //for DNs that might not exist in the DN cache. If the DN is not in
+ //the suffixes dn2id DB, then the dn cache is used.
+ if (!clearedBackend)
+ {
+ EntryID id = suffix.getDN2ID().get(null, entryDN, LockMode.DEFAULT);
+ if (id != null || !tmpEnv.insert(entryDN, keyEntry, valEntry))
+ {
+ reader.rejectEntry(entry, WARN_JEB_IMPORT_ENTRY_EXISTS.get());
+ return false;
+ }
+ }
+ else if (!tmpEnv.insert(entryDN, keyEntry, valEntry))
+ {
+ reader.rejectEntry(entry, WARN_JEB_IMPORT_ENTRY_EXISTS.get());
+ return false;
+ }
+ return true;
+ }
+
+ void processIndexes(Suffix suffix, Entry entry, EntryID entryID)
+ throws DatabaseException, DirectoryException, JebException, InterruptedException
+ {
+ for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
+ {
+ AttributeType attributeType = mapEntry.getKey();
+ if (entry.hasAttribute(attributeType))
+ {
+ fillIndexKey(suffix, mapEntry, entry, attributeType, entryID);
+ }
+ }
+ }
+
+ void fillIndexKey(Suffix suffix, Map.Entry<AttributeType, AttributeIndex> mapEntry, Entry entry,
+ AttributeType attrType, EntryID entryID)
+ throws DatabaseException, InterruptedException, DirectoryException, JebException
+ {
+ final AttributeIndex attrIndex = mapEntry.getValue();
+ final IndexingOptions options = attrIndex.getIndexingOptions();
+
+ processAttribute(attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, entry, attrType, entryID, options);
+ processAttribute(attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, entry, attrType, entryID, options);
+ processAttribute(attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, entry, attrType, entryID, options);
+ processAttribute(attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, entry, attrType, entryID, options);
+ processAttribute(attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, entry, attrType, entryID, options);
+
+ for (VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes())
+ {
+ Transaction transaction = null;
+ vlvIdx.addEntry(transaction, entryID, entry);
+ }
+ Map<String, Collection<Index>> extensibleMap = attrIndex.getExtensibleIndexes();
+ if (!extensibleMap.isEmpty())
+ {
+ Collection<Index> subIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING);
+ processAttributes(subIndexes, ImportIndexType.EX_SUBSTRING, entry, attrType, entryID, options);
+ Collection<Index> sharedIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED);
+ processAttributes(sharedIndexes, ImportIndexType.EX_SHARED, entry, attrType, entryID, options);
+ }
+ }
+
+ private void processAttribute(Index index, ImportIndexType presence, Entry entry,
+ AttributeType attributeType, EntryID entryID, IndexingOptions options) throws InterruptedException
+ {
+ if (index != null)
+ {
+ IndexKey indexKey = new IndexKey(attributeType, presence, index.getIndexEntryLimit());
+ processAttribute(index, entry, entryID, options, indexKey);
+ }
+ }
+
+ private void processAttributes(Collection<Index> indexes, ImportIndexType indexType, Entry entry,
+ AttributeType attributeType, EntryID entryID, IndexingOptions options) throws InterruptedException
+ {
+ if (indexes != null)
+ {
+ for (Index index : indexes)
+ {
+ IndexKey indexKey = new IndexKey(attributeType, indexType, index.getIndexEntryLimit());
+ processAttribute(index, entry, entryID, options, indexKey);
+ }
+ }
+ }
+
+ void processAttribute(Index index, Entry entry, EntryID entryID, IndexingOptions options,
+ IndexKey indexKey) throws DatabaseException, InterruptedException
+ {
+ insertKeySet.clear();
+ index.indexer.indexEntry(entry, insertKeySet, options);
+ for (ByteString key : insertKeySet)
+ {
+ processKey(index, key.toByteArray(), entryID, indexComparator, indexKey, true);
+ }
+ }
+
+ void flushIndexBuffers() throws InterruptedException, ExecutionException
+ {
+ Set<Map.Entry<IndexKey, IndexOutputBuffer>> set = indexBufferMap.entrySet();
+ Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> setIterator = set.iterator();
+ while (setIterator.hasNext())
+ {
+ Map.Entry<IndexKey, IndexOutputBuffer> e = setIterator.next();
+ IndexKey indexKey = e.getKey();
+ IndexOutputBuffer indexBuffer = e.getValue();
+ setIterator.remove();
+ indexBuffer.setComparator(indexComparator);
+ indexBuffer.setIndexKey(indexKey);
+ indexBuffer.discard();
+ Future<Void> future = bufferSortService.submit(new SortTask(indexBuffer));
+ future.get();
+ }
+ }
+
+ int processKey(DatabaseContainer container, byte[] key, EntryID entryID,
+ IndexOutputBuffer.ComparatorBuffer<byte[]> comparator,
+ IndexKey indexKey, boolean insert) throws InterruptedException
+ {
+ int sizeNeeded = IndexOutputBuffer.getRequiredSize(key.length, entryID.longValue());
+ IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey);
+ if (indexBuffer == null)
+ {
+ indexBuffer = getNewIndexBuffer(sizeNeeded);
+ indexBufferMap.put(indexKey, indexBuffer);
+ }
+ else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
+ {
+ // complete the current buffer...
+ indexBuffer.setComparator(comparator);
+ indexBuffer.setIndexKey(indexKey);
+ bufferSortService.submit(new SortTask(indexBuffer));
+ // ... and get a new one
+ indexBuffer = getNewIndexBuffer(sizeNeeded);
+ indexBufferMap.put(indexKey, indexBuffer);
+ }
+ int id = System.identityHashCode(container);
+ indexBuffer.add(key, entryID, id, insert);
+ return id;
+ }
+
+ IndexOutputBuffer getNewIndexBuffer(int size) throws InterruptedException
+ {
+ IndexOutputBuffer indexBuffer;
+ if (size > bufferSize)
+ {
+ indexBuffer = new IndexOutputBuffer(size);
+ indexBuffer.discard();
+ }
+ else
+ {
+ indexBuffer = freeBufferQueue.take();
+ if (indexBuffer == null)
+ {
+ throw new InterruptedException("Index buffer processing error.");
+ }
+ }
+ if (indexBuffer.isPoison())
+ {
+ throw new InterruptedException("Cancel processing received.");
+ }
+ return indexBuffer;
+ }
+
+ void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
+ throws InterruptedException
+ {
+ DN2ID dn2id = suffix.getDN2ID();
+ byte[] dnBytes = JebFormat.dnToDNKey(dn, suffix.getBaseDN().size());
+ IndexKey indexKey = new IndexKey(dnType, ImportIndexType.DN, 1);
+ int id = processKey(dn2id, dnBytes, entryID, indexComparator, indexKey, true);
+ idECMap.putIfAbsent(id, suffix.getEntryContainer());
+ }
+
+ void processDN2URI(Suffix suffix, Entry oldEntry, Entry newEntry)
+ throws DatabaseException
+ {
+ DN2URI dn2uri = suffix.getDN2URI();
+ if (oldEntry != null)
+ {
+ dn2uri.replaceEntry(null, oldEntry, newEntry);
+ }
+ else
+ {
+ dn2uri.addEntry(null, newEntry);
+ }
+ }
+ }
+
+ /**
+ * This task reads sorted records from the temporary index scratch files,
+ * processes the records and writes the results to the index database. The DN
+ * index is treated differently then non-DN indexes.
+ */
+ private final class IndexDBWriteTask implements Callable<Void>
+ {
+ private final IndexManager indexMgr;
+ private final DatabaseEntry dbKey, dbValue;
+ private final int cacheSize;
+ private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>();
+ private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>();
+ private final Semaphore permits;
+ private final int maxPermits;
+ private final AtomicLong bytesRead = new AtomicLong();
+ private long lastBytesRead;
+ private final AtomicInteger keyCount = new AtomicInteger();
+ private RandomAccessFile bufferFile;
+ private DataInputStream bufferIndexFile;
+ private int remainingBuffers;
+ private volatile int totalBatches;
+ private AtomicInteger batchNumber = new AtomicInteger();
+ private int nextBufferID;
+ private int ownedPermits;
+ private volatile boolean isRunning;
+
+ /**
+ * Creates a new index DB writer.
+ *
+ * @param indexMgr
+ * The index manager.
+ * @param permits
+ * The semaphore used for restricting the number of buffer
+ * allocations.
+ * @param maxPermits
+ * The maximum number of buffers which can be allocated.
+ * @param cacheSize
+ * The buffer cache size.
+ */
+ public IndexDBWriteTask(IndexManager indexMgr, Semaphore permits,
+ int maxPermits, int cacheSize)
+ {
+ this.indexMgr = indexMgr;
+ this.permits = permits;
+ this.maxPermits = maxPermits;
+ this.cacheSize = cacheSize;
+
+ this.dbKey = new DatabaseEntry();
+ this.dbValue = new DatabaseEntry();
+ }
+
+ /**
+ * Initializes this task.
+ *
+ * @throws IOException
+ * If an IO error occurred.
+ */
+ public void beginWriteTask() throws IOException
+ {
+ bufferFile = new RandomAccessFile(indexMgr.getBufferFile(), "r");
+ bufferIndexFile =
+ new DataInputStream(new BufferedInputStream(new FileInputStream(
+ indexMgr.getBufferIndexFile())));
+
+ remainingBuffers = indexMgr.getNumberOfBuffers();
+ totalBatches = (remainingBuffers / maxPermits) + 1;
+ batchNumber.set(0);
+ nextBufferID = 0;
+ ownedPermits = 0;
+
+ logger.info(NOTE_JEB_IMPORT_LDIF_INDEX_STARTED, indexMgr.getBufferFileName(),
+ remainingBuffers, totalBatches);
+
+ indexMgr.setIndexDBWriteTask(this);
+ isRunning = true;
+ }
+
+ /**
+ * Returns the next batch of buffers to be processed, blocking until enough
+ * buffer permits are available.
+ *
+ * @return The next batch of buffers, or {@code null} if there are no more
+ * buffers to be processed.
+ * @throws Exception
+ * If an exception occurred.
+ */
+ public NavigableSet<IndexInputBuffer> getNextBufferBatch() throws Exception
+ {
+ // First release any previously acquired permits.
+ if (ownedPermits > 0)
+ {
+ permits.release(ownedPermits);
+ ownedPermits = 0;
+ }
+
+ // Block until we can either get enough permits for all buffers, or the
+ // maximum number of permits.
+ final int permitRequest = Math.min(remainingBuffers, maxPermits);
+ if (permitRequest == 0)
+ {
+ // No more work to do.
+ return null;
+ }
+ permits.acquire(permitRequest);
+
+ // Update counters.
+ ownedPermits = permitRequest;
+ remainingBuffers -= permitRequest;
+ batchNumber.incrementAndGet();
+
+ // Create all the index buffers for the next batch.
+ final NavigableSet<IndexInputBuffer> buffers = new TreeSet<IndexInputBuffer>();
+ for (int i = 0; i < permitRequest; i++)
+ {
+ final long bufferBegin = bufferIndexFile.readLong();
+ final long bufferEnd = bufferIndexFile.readLong();
+ final IndexInputBuffer b =
+ new IndexInputBuffer(indexMgr, bufferFile.getChannel(),
+ bufferBegin, bufferEnd, nextBufferID++, cacheSize);
+ buffers.add(b);
+ }
+
+ return buffers;
+ }
+
+ /**
+ * Finishes this task.
+ */
+ public void endWriteTask()
+ {
+ isRunning = false;
+
+ // First release any previously acquired permits.
+ if (ownedPermits > 0)
+ {
+ permits.release(ownedPermits);
+ ownedPermits = 0;
+ }
+
+ try
+ {
+ if (indexMgr.isDN2ID())
+ {
+ for (DNState dnState : dnStateMap.values())
+ {
+ dnState.flush();
+ }
+ if (!isCanceled)
+ {
+ logger.info(NOTE_JEB_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount());
+ }
+ }
+ else
+ {
+ if (!isCanceled)
+ {
+ logger.info(NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE, indexMgr.getBufferFileName());
+ }
+ }
+ }
+ finally
+ {
+ close(bufferFile, bufferIndexFile);
+
+ indexMgr.getBufferFile().delete();
+ indexMgr.getBufferIndexFile().delete();
+ }
+ }
+
+ /**
+ * Print out progress stats.
+ *
+ * @param deltaTime
+ * The time since the last update.
+ */
+ public void printStats(long deltaTime)
+ {
+ if (isRunning)
+ {
+ final long bufferFileSize = indexMgr.getBufferFileSize();
+ final long tmpBytesRead = bytesRead.get();
+ final int currentBatch = batchNumber.get();
+
+ final long bytesReadInterval = tmpBytesRead - lastBytesRead;
+ final int bytesReadPercent =
+ Math.round((100f * tmpBytesRead) / bufferFileSize);
+
+ // Kilo and milli approximately cancel out.
+ final long kiloBytesRate = bytesReadInterval / deltaTime;
+ final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
+
+ logger.info(NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT, indexMgr.getBufferFileName(),
+ bytesReadPercent, kiloBytesRemaining, kiloBytesRate, currentBatch, totalBatches);
+
+ lastBytesRead = tmpBytesRead;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception, DirectoryException
+ {
+ ByteBuffer key = null;
+ ImportIDSet insertIDSet = null;
+ ImportIDSet deleteIDSet = null;
+ Integer indexID = null;
+
+ if (isCanceled)
+ {
+ return null;
+ }
+
+ try
+ {
+ beginWriteTask();
+
+ NavigableSet<IndexInputBuffer> bufferSet;
+ while ((bufferSet = getNextBufferBatch()) != null)
+ {
+ if (isCanceled)
+ {
+ return null;
+ }
+
+ while (!bufferSet.isEmpty())
+ {
+ IndexInputBuffer b = bufferSet.pollFirst();
+ if (key == null)
+ {
+ indexID = b.getIndexID();
+
+ if (indexMgr.isDN2ID())
+ {
+ insertIDSet = new ImportIDSet(1, 1, false);
+ deleteIDSet = new ImportIDSet(1, 1, false);
+ }
+ else
+ {
+ Index index = idContainerMap.get(indexID);
+ int limit = index.getIndexEntryLimit();
+ boolean doCount = index.getMaintainCount();
+ insertIDSet = new ImportIDSet(1, limit, doCount);
+ deleteIDSet = new ImportIDSet(1, limit, doCount);
+ }
+
+ key = ByteBuffer.allocate(b.getKeyLen());
+ key.flip();
+ b.getKey(key);
+
+ b.mergeIDSet(insertIDSet);
+ b.mergeIDSet(deleteIDSet);
+ insertIDSet.setKey(key);
+ deleteIDSet.setKey(key);
+ }
+ else if (b.compare(key, indexID) != 0)
+ {
+ addToDB(insertIDSet, deleteIDSet, indexID);
+ keyCount.incrementAndGet();
+
+ indexID = b.getIndexID();
+
+ if (indexMgr.isDN2ID())
+ {
+ insertIDSet = new ImportIDSet(1, 1, false);
+ deleteIDSet = new ImportIDSet(1, 1, false);
+ }
+ else
+ {
+ Index index = idContainerMap.get(indexID);
+ int limit = index.getIndexEntryLimit();
+ boolean doCount = index.getMaintainCount();
+ insertIDSet = new ImportIDSet(1, limit, doCount);
+ deleteIDSet = new ImportIDSet(1, limit, doCount);
+ }
+
+ key.clear();
+ if (b.getKeyLen() > key.capacity())
+ {
+ key = ByteBuffer.allocate(b.getKeyLen());
+ }
+ key.flip();
+ b.getKey(key);
+
+ b.mergeIDSet(insertIDSet);
+ b.mergeIDSet(deleteIDSet);
+ insertIDSet.setKey(key);
+ deleteIDSet.setKey(key);
+ }
+ else
+ {
+ b.mergeIDSet(insertIDSet);
+ b.mergeIDSet(deleteIDSet);
+ }
+
+ if (b.hasMoreData())
+ {
+ b.getNextRecord();
+ bufferSet.add(b);
+ }
+ }
+
+ if (key != null)
+ {
+ addToDB(insertIDSet, deleteIDSet, indexID);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error(ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR, indexMgr.getBufferFileName(), e.getMessage());
+ throw e;
+ }
+ finally
+ {
+ endWriteTask();
+ }
+ return null;
+ }
+
+ private void addToDB(ImportIDSet insertSet, ImportIDSet deleteSet,
+ int indexID) throws DirectoryException
+ {
+ if (!indexMgr.isDN2ID())
+ {
+ if (deleteSet.size() > 0 || !deleteSet.isDefined())
+ {
+ dbKey.setData(deleteSet.getKey().array(), 0, deleteSet.getKey().limit());
+ final Index index = idContainerMap.get(indexID);
+ index.delete(dbKey, deleteSet, dbValue);
+ if (!indexMap.containsKey(indexID))
+ {
+ indexMap.put(indexID, index);
+ }
+ }
+ if (insertSet.size() > 0 || !insertSet.isDefined())
+ {
+ dbKey.setData(insertSet.getKey().array(), 0, insertSet.getKey().limit());
+ final Index index = idContainerMap.get(indexID);
+ index.insert(dbKey, insertSet, dbValue);
+ if (!indexMap.containsKey(indexID))
+ {
+ indexMap.put(indexID, index);
+ }
+ }
+ }
+ else
+ {
+ addDN2ID(insertSet, indexID);
+ }
+ }
+
+ private void addDN2ID(ImportIDSet record, Integer indexID)
+ throws DirectoryException
+ {
+ DNState dnState;
+ if (!dnStateMap.containsKey(indexID))
+ {
+ dnState = new DNState(idECMap.get(indexID));
+ dnStateMap.put(indexID, dnState);
+ }
+ else
+ {
+ dnState = dnStateMap.get(indexID);
+ }
+ if (dnState.checkParent(record))
+ {
+ dnState.writeToDB();
+ }
+ }
+
+ private void addBytesRead(int bytesRead)
+ {
+ this.bytesRead.addAndGet(bytesRead);
+ }
+
+ /**
+ * This class is used to by a index DB merge thread performing DN processing
+ * to keep track of the state of individual DN2ID index processing.
+ */
+ class DNState
+ {
+ private static final int DN_STATE_CACHE_SIZE = 64 * KB;
+
+ private ByteBuffer parentDN, lastDN;
+ private EntryID parentID, lastID, entryID;
+ private final DatabaseEntry dnKey, dnValue;
+ private final TreeMap<ByteBuffer, EntryID> parentIDMap;
+ private final EntryContainer entryContainer;
+ private final Map<byte[], ImportIDSet> id2childTree;
+ private final Map<byte[], ImportIDSet> id2subtreeTree;
+ private final int childLimit, subTreeLimit;
+ private final boolean childDoCount, subTreeDoCount;
+
+ DNState(EntryContainer entryContainer)
+ {
+ this.entryContainer = entryContainer;
+ parentIDMap = new TreeMap<ByteBuffer, EntryID>();
+ Comparator<byte[]> childComparator =
+ entryContainer.getID2Children().getComparator();
+ id2childTree = new TreeMap<byte[], ImportIDSet>(childComparator);
+ childLimit = entryContainer.getID2Children().getIndexEntryLimit();
+ childDoCount = entryContainer.getID2Children().getMaintainCount();
+ Comparator<byte[]> subComparator =
+ entryContainer.getID2Subtree().getComparator();
+ subTreeLimit = entryContainer.getID2Subtree().getIndexEntryLimit();
+ subTreeDoCount = entryContainer.getID2Subtree().getMaintainCount();
+ id2subtreeTree = new TreeMap<byte[], ImportIDSet>(subComparator);
+ dnKey = new DatabaseEntry();
+ dnValue = new DatabaseEntry();
+ lastDN = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
+ }
+
+ private ByteBuffer getParent(ByteBuffer buffer)
+ {
+ int parentIndex = JebFormat.findDNKeyParent(buffer.array(), 0, buffer.limit());
+ if (parentIndex < 0)
+ {
+ // This is the root or base DN
+ return null;
+ }
+ ByteBuffer parent = buffer.duplicate();
+ parent.limit(parentIndex);
+ return parent;
+ }
+
+ private ByteBuffer deepCopy(ByteBuffer srcBuffer, ByteBuffer destBuffer)
+ {
+ if (destBuffer == null
+ || destBuffer.clear().remaining() < srcBuffer.limit())
+ {
+ byte[] bytes = new byte[srcBuffer.limit()];
+ System.arraycopy(srcBuffer.array(), 0, bytes, 0, srcBuffer.limit());
+ return ByteBuffer.wrap(bytes);
+ }
+ else
+ {
+ destBuffer.put(srcBuffer);
+ destBuffer.flip();
+ return destBuffer;
+ }
+ }
+
+ /** Why do we still need this if we are checking parents in the first phase? */
+ private boolean checkParent(ImportIDSet record) throws DatabaseException
+ {
+ dnKey.setData(record.getKey().array(), 0, record.getKey().limit());
+ byte[] v = record.toDatabase();
+ long v1 = JebFormat.entryIDFromDatabase(v);
+ dnValue.setData(v);
+
+ entryID = new EntryID(v1);
+ parentDN = getParent(record.getKey());
+
+ //Bypass the cache for append data, lookup the parent in DN2ID and return.
+ if (importConfiguration != null
+ && importConfiguration.appendToExistingData())
+ {
+ //If null is returned than this is a suffix DN.
+ if (parentDN != null)
+ {
+ DatabaseEntry key = new DatabaseEntry(parentDN.array(), 0, parentDN.limit());
+ DatabaseEntry value = new DatabaseEntry();
+ OperationStatus status = entryContainer.getDN2ID().read(null, key, value, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS)
+ {
+ parentID = new EntryID(value);
+ }
+ else
+ {
+ // We have a missing parent. Maybe parent checking was turned off?
+ // Just ignore.
+ parentID = null;
+ return false;
+ }
+ }
+ }
+ else if (parentIDMap.isEmpty())
+ {
+ parentIDMap.put(deepCopy(record.getKey(), null), entryID);
+ return true;
+ }
+ else if (lastDN != null && lastDN.equals(parentDN))
+ {
+ parentIDMap.put(deepCopy(lastDN, null), lastID);
+ parentID = lastID;
+ lastDN = deepCopy(record.getKey(), lastDN);
+ lastID = entryID;
+ return true;
+ }
+ else if (parentIDMap.lastKey().equals(parentDN))
+ {
+ parentID = parentIDMap.get(parentDN);
+ lastDN = deepCopy(record.getKey(), lastDN);
+ lastID = entryID;
+ return true;
+ }
+ else if (parentIDMap.containsKey(parentDN))
+ {
+ EntryID newParentID = parentIDMap.get(parentDN);
+ ByteBuffer key = parentIDMap.lastKey();
+ while (!parentDN.equals(key))
+ {
+ parentIDMap.remove(key);
+ key = parentIDMap.lastKey();
+ }
+ parentIDMap.put(deepCopy(record.getKey(), null), entryID);
+ parentID = newParentID;
+ lastDN = deepCopy(record.getKey(), lastDN);
+ lastID = entryID;
+ }
+ else
+ {
+ // We have a missing parent. Maybe parent checking was turned off?
+ // Just ignore.
+ parentID = null;
+ return false;
+ }
+ return true;
+ }
+
+ private void id2child(EntryID childID) throws DirectoryException
+ {
+ ImportIDSet idSet;
+ if (parentID != null)
+ {
+ if (!id2childTree.containsKey(parentID.getDatabaseEntry().getData()))
+ {
+ idSet = new ImportIDSet(1, childLimit, childDoCount);
+ id2childTree.put(parentID.getDatabaseEntry().getData(), idSet);
+ }
+ else
+ {
+ idSet = id2childTree.get(parentID.getDatabaseEntry().getData());
+ }
+ idSet.addEntryID(childID);
+ if (id2childTree.size() > DN_STATE_CACHE_SIZE)
+ {
+ flushMapToDB(id2childTree, entryContainer.getID2Children(), true);
+ }
+ }
+ else
+ {
+ throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION,
+ ERR_PARENT_ENTRY_IS_MISSING.get());
+ }
+ }
+
+ private EntryID getParentID(ByteBuffer dn) throws DatabaseException
+ {
+ // Bypass the cache for append data, lookup the parent DN in the DN2ID db
+ if (importConfiguration == null || !importConfiguration.appendToExistingData())
+ {
+ return parentIDMap.get(dn);
+ }
+ DatabaseEntry key = new DatabaseEntry(dn.array(), 0, dn.limit());
+ DatabaseEntry value = new DatabaseEntry();
+ OperationStatus status = entryContainer.getDN2ID().read(null, key, value, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS)
+ {
+ return new EntryID(value);
+ }
+ return null;
+ }
+
+ private void id2SubTree(EntryID childID) throws DirectoryException
+ {
+ if (parentID != null)
+ {
+ ImportIDSet idSet;
+ if (!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData()))
+ {
+ idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount);
+ id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet);
+ }
+ else
+ {
+ idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData());
+ }
+ idSet.addEntryID(childID);
+ // TODO:
+ // Instead of doing this,
+ // we can just walk to parent cache if available
+ for (ByteBuffer dn = getParent(parentDN); dn != null; dn = getParent(dn))
+ {
+ EntryID nodeID = getParentID(dn);
+ if (nodeID == null)
+ {
+ // We have a missing parent. Maybe parent checking was turned off?
+ // Just ignore.
+ break;
+ }
+ if (!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData()))
+ {
+ idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount);
+ id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet);
+ }
+ else
+ {
+ idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData());
+ }
+ idSet.addEntryID(childID);
+ }
+ if (id2subtreeTree.size() > DN_STATE_CACHE_SIZE)
+ {
+ flushMapToDB(id2subtreeTree, entryContainer.getID2Subtree(), true);
+ }
+ }
+ else
+ {
+ throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION,
+ ERR_PARENT_ENTRY_IS_MISSING.get());
+ }
+ }
+
+ public void writeToDB() throws DirectoryException
+ {
+ entryContainer.getDN2ID().put(null, dnKey, dnValue);
+ indexMgr.addTotDNCount(1);
+ if (parentDN != null)
+ {
+ id2child(entryID);
+ id2SubTree(entryID);
+ }
+ }
+
+ private void flushMapToDB(Map<byte[], ImportIDSet> map, Index index,
+ boolean clearMap)
+ {
+ for (Map.Entry<byte[], ImportIDSet> e : map.entrySet())
+ {
+ byte[] key = e.getKey();
+ ImportIDSet idSet = e.getValue();
+ dnKey.setData(key);
+ index.insert(dnKey, idSet, dnValue);
+ }
+ if (clearMap)
+ {
+ map.clear();
+ }
+ }
+
+ public void flush()
+ {
+ flushMapToDB(id2childTree, entryContainer.getID2Children(), false);
+ flushMapToDB(id2subtreeTree, entryContainer.getID2Subtree(), false);
+ }
+ }
+ }
+
+ /**
+ * This task writes the temporary scratch index files using the sorted buffers
+ * read from a blocking queue private to each index.
+ */
+ private final class ScratchFileWriterTask implements Callable<Void>
+ {
+ private final int DRAIN_TO = 3;
+ private final IndexManager indexMgr;
+ private final BlockingQueue<IndexOutputBuffer> queue;
+ private final ByteArrayOutputStream insertByteStream = new ByteArrayOutputStream(2 * bufferSize);
+ private final ByteArrayOutputStream deleteByteStream = new ByteArrayOutputStream(2 * bufferSize);
+ private final DataOutputStream bufferStream;
+ private final DataOutputStream bufferIndexStream;
+ private final byte[] tmpArray = new byte[8];
+ private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<IndexOutputBuffer>();
+ private int insertKeyCount, deleteKeyCount;
+ private int bufferCount;
+ private boolean poisonSeen;
+
+ public ScratchFileWriterTask(BlockingQueue<IndexOutputBuffer> queue,
+ IndexManager indexMgr) throws FileNotFoundException
+ {
+ this.queue = queue;
+ this.indexMgr = indexMgr;
+ this.bufferStream = newDataOutputStream(indexMgr.getBufferFile());
+ this.bufferIndexStream = newDataOutputStream(indexMgr.getBufferIndexFile());
+ }
+
+ private DataOutputStream newDataOutputStream(File file) throws FileNotFoundException
+ {
+ return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), READER_WRITER_BUFFER_SIZE));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws IOException, InterruptedException
+ {
+ long offset = 0;
+ List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>();
+ try
+ {
+ while (true)
+ {
+ final IndexOutputBuffer indexBuffer = queue.take();
+ long beginOffset = offset;
+ long bufferLen;
+ if (!queue.isEmpty())
+ {
+ queue.drainTo(l, DRAIN_TO);
+ l.add(indexBuffer);
+ bufferLen = writeIndexBuffers(l);
+ for (IndexOutputBuffer id : l)
+ {
+ if (!id.isDiscarded())
+ {
+ id.reset();
+ freeBufferQueue.add(id);
+ }
+ }
+ l.clear();
+ }
+ else
+ {
+ if (indexBuffer.isPoison())
+ {
+ break;
+ }
+ bufferLen = writeIndexBuffer(indexBuffer);
+ if (!indexBuffer.isDiscarded())
+ {
+ indexBuffer.reset();
+ freeBufferQueue.add(indexBuffer);
+ }
+ }
+ offset += bufferLen;
+
+ // Write buffer index information.
+ bufferIndexStream.writeLong(beginOffset);
+ bufferIndexStream.writeLong(offset);
+
+ bufferCount++;
+ Importer.this.bufferCount.incrementAndGet();
+
+ if (poisonSeen)
+ {
+ break;
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ logger.error(ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR,
+ indexMgr.getBufferFile().getAbsolutePath(), e.getMessage());
+ isCanceled = true;
+ throw e;
+ }
+ finally
+ {
+ close(bufferStream, bufferIndexStream);
+ indexMgr.setBufferInfo(bufferCount, indexMgr.getBufferFile().length());
+ }
+ return null;
+ }
+
+ private long writeIndexBuffer(IndexOutputBuffer indexBuffer) throws IOException
+ {
+ indexBuffer.setPosition(-1);
+ resetStreams();
+
+ long bufferLen = 0;
+ final int numberKeys = indexBuffer.getNumberKeys();
+ for (int i = 0; i < numberKeys; i++)
+ {
+ if (indexBuffer.getPosition() == -1)
+ {
+ indexBuffer.setPosition(i);
+ insertOrDeleteKey(indexBuffer, i);
+ continue;
+ }
+ if (!indexBuffer.compare(i))
+ {
+ bufferLen += writeRecord(indexBuffer);
+ indexBuffer.setPosition(i);
+ resetStreams();
+ }
+ insertOrDeleteKeyCheckEntryLimit(indexBuffer, i);
+ }
+ if (indexBuffer.getPosition() != -1)
+ {
+ bufferLen += writeRecord(indexBuffer);
+ }
+ return bufferLen;
+ }
+
+ private long writeIndexBuffers(List<IndexOutputBuffer> buffers) throws IOException
+ {
+ resetStreams();
+
+ long id = 0;
+ long bufferLen = 0;
+ for (IndexOutputBuffer b : buffers)
+ {
+ if (b.isPoison())
+ {
+ poisonSeen = true;
+ }
+ else
+ {
+ b.setPosition(0);
+ b.setID(id++);
+ indexSortedSet.add(b);
+ }
+ }
+ byte[] saveKey = null;
+ int saveIndexID = 0;
+ while (!indexSortedSet.isEmpty())
+ {
+ final IndexOutputBuffer b = indexSortedSet.pollFirst();
+ if (saveKey == null)
+ {
+ saveKey = b.getKey();
+ saveIndexID = b.getIndexID();
+ insertOrDeleteKey(b, b.getPosition());
+ }
+ else if (!b.compare(saveKey, saveIndexID))
+ {
+ bufferLen += writeRecord(saveKey, saveIndexID);
+ resetStreams();
+ saveKey = b.getKey();
+ saveIndexID = b.getIndexID();
+ insertOrDeleteKey(b, b.getPosition());
+ }
+ else
+ {
+ insertOrDeleteKeyCheckEntryLimit(b, b.getPosition());
+ }
+ if (b.hasMoreData())
+ {
+ b.nextRecord();
+ indexSortedSet.add(b);
+ }
+ }
+ if (saveKey != null)
+ {
+ bufferLen += writeRecord(saveKey, saveIndexID);
+ }
+ return bufferLen;
+ }
+
+ private void resetStreams()
+ {
+ insertByteStream.reset();
+ insertKeyCount = 0;
+ deleteByteStream.reset();
+ deleteKeyCount = 0;
+ }
+
+ private void insertOrDeleteKey(IndexOutputBuffer indexBuffer, int i)
+ {
+ if (indexBuffer.isInsertRecord(i))
+ {
+ indexBuffer.writeID(insertByteStream, i);
+ insertKeyCount++;
+ }
+ else
+ {
+ indexBuffer.writeID(deleteByteStream, i);
+ deleteKeyCount++;
+ }
+ }
+
+ private void insertOrDeleteKeyCheckEntryLimit(IndexOutputBuffer indexBuffer, int i)
+ {
+ if (indexBuffer.isInsertRecord(i))
+ {
+ if (insertKeyCount++ <= indexMgr.getLimit())
+ {
+ indexBuffer.writeID(insertByteStream, i);
+ }
+ }
+ else
+ {
+ indexBuffer.writeID(deleteByteStream, i);
+ deleteKeyCount++;
+ }
+ }
+
+ private int writeByteStreams() throws IOException
+ {
+ if (insertKeyCount > indexMgr.getLimit())
+ {
+ insertKeyCount = 1;
+ insertByteStream.reset();
+ PackedInteger.writeInt(tmpArray, 0, -1);
+ insertByteStream.write(tmpArray, 0, 1);
+ }
+ int insertSize = PackedInteger.getWriteIntLength(insertKeyCount);
+ PackedInteger.writeInt(tmpArray, 0, insertKeyCount);
+ bufferStream.write(tmpArray, 0, insertSize);
+ if (insertByteStream.size() > 0)
+ {
+ insertByteStream.writeTo(bufferStream);
+ }
+ int deleteSize = PackedInteger.getWriteIntLength(deleteKeyCount);
+ PackedInteger.writeInt(tmpArray, 0, deleteKeyCount);
+ bufferStream.write(tmpArray, 0, deleteSize);
+ if (deleteByteStream.size() > 0)
+ {
+ deleteByteStream.writeTo(bufferStream);
+ }
+ return insertSize + deleteSize;
+ }
+
+ private int writeHeader(int indexID, int keySize) throws IOException
+ {
+ bufferStream.writeInt(indexID);
+ int packedSize = PackedInteger.getWriteIntLength(keySize);
+ PackedInteger.writeInt(tmpArray, 0, keySize);
+ bufferStream.write(tmpArray, 0, packedSize);
+ return packedSize;
+ }
+
+ private int writeRecord(IndexOutputBuffer b) throws IOException
+ {
+ int keySize = b.getKeySize();
+ int packedSize = writeHeader(b.getIndexID(), keySize);
+ b.writeKey(bufferStream);
+ packedSize += writeByteStreams();
+ return packedSize + keySize + insertByteStream.size() + deleteByteStream.size() + 4;
+ }
+
+ private int writeRecord(byte[] k, int indexID) throws IOException
+ {
+ int packedSize = writeHeader(indexID, k.length);
+ bufferStream.write(k);
+ packedSize += writeByteStreams();
+ return packedSize + k.length + insertByteStream.size() + deleteByteStream.size() + 4;
+ }
+ }
+
+ /**
+ * This task main function is to sort the index buffers given to it from the
+ * import tasks reading the LDIF file. It will also create a index file writer
+ * task and corresponding queue if needed. The sorted index buffers are put on
+ * the index file writer queues for writing to a temporary file.
+ */
+ private final class SortTask implements Callable<Void>
+ {
+
+ private final IndexOutputBuffer indexBuffer;
+
+ public SortTask(IndexOutputBuffer indexBuffer)
+ {
+ this.indexBuffer = indexBuffer;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception
+ {
+ if ((importConfiguration != null && importConfiguration.isCancelled())
+ || isCanceled)
+ {
+ isCanceled = true;
+ return null;
+ }
+ indexBuffer.sort();
+ if (!indexKeyQueueMap.containsKey(indexBuffer.getIndexKey()))
+ {
+ createIndexWriterTask(indexBuffer.getIndexKey());
+ }
+ indexKeyQueueMap.get(indexBuffer.getIndexKey()).add(indexBuffer);
+ return null;
+ }
+
+ private void createIndexWriterTask(IndexKey indexKey) throws FileNotFoundException
+ {
+ synchronized (synObj)
+ {
+ if (indexKeyQueueMap.containsKey(indexKey))
+ {
+ return;
+ }
+ boolean isDN2ID = ImportIndexType.DN.equals(indexKey.getIndexType());
+ IndexManager indexMgr = new IndexManager(indexKey.getName(), isDN2ID, indexKey.getEntryLimit());
+ if (isDN2ID)
+ {
+ DNIndexMgrList.add(indexMgr);
+ }
+ else
+ {
+ indexMgrList.add(indexMgr);
+ }
+ BlockingQueue<IndexOutputBuffer> newQueue =
+ new ArrayBlockingQueue<IndexOutputBuffer>(phaseOneBufferCount);
+ ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQueue, indexMgr);
+ scratchFileWriterList.add(indexWriter);
+ scratchFileWriterFutures.add(scratchFileWriterService.submit(indexWriter));
+ indexKeyQueueMap.put(indexKey, newQueue);
+ }
+ }
+ }
+
+ /**
+ * The index manager class has several functions:
+ * <ol>
+ * <li>It is used to carry information about index processing created in phase one to phase two</li>
+ * <li>It collects statistics about phase two processing for each index</li>
+ * <li>It manages opening and closing the scratch index files</li>
+ * </ol>
+ */
+ final class IndexManager implements Comparable<IndexManager>
+ {
+ private final File bufferFile;
+ private final String bufferFileName;
+ private final File bufferIndexFile;
+ private final boolean isDN2ID;
+ private final int limit;
+
+ private int numberOfBuffers;
+ private long bufferFileSize;
+ private long totalDNs;
+ private volatile IndexDBWriteTask writer;
+
+ private IndexManager(String fileName, boolean isDN2ID, int limit)
+ {
+ this.bufferFileName = fileName;
+ this.bufferFile = new File(tempDir, bufferFileName);
+ this.bufferIndexFile = new File(tempDir, bufferFileName + ".index");
+
+ this.isDN2ID = isDN2ID;
+ this.limit = limit > 0 ? limit : Integer.MAX_VALUE;
+ }
+
+ private void setIndexDBWriteTask(IndexDBWriteTask writer)
+ {
+ this.writer = writer;
+ }
+
+ private File getBufferFile()
+ {
+ return bufferFile;
+ }
+
+ private long getBufferFileSize()
+ {
+ return bufferFileSize;
+ }
+
+ private File getBufferIndexFile()
+ {
+ return bufferIndexFile;
+ }
+
+ private void setBufferInfo(int numberOfBuffers, long bufferFileSize)
+ {
+ this.numberOfBuffers = numberOfBuffers;
+ this.bufferFileSize = bufferFileSize;
+ }
+
+ /**
+ * Updates the bytes read counter.
+ *
+ * @param bytesRead
+ * The number of bytes read.
+ */
+ void addBytesRead(int bytesRead)
+ {
+ if (writer != null)
+ {
+ writer.addBytesRead(bytesRead);
+ }
+ }
+
+ private void addTotDNCount(int delta)
+ {
+ totalDNs += delta;
+ }
+
+ private long getDNCount()
+ {
+ return totalDNs;
+ }
+
+ private boolean isDN2ID()
+ {
+ return isDN2ID;
+ }
+
+ private void printStats(long deltaTime)
+ {
+ if (writer != null)
+ {
+ writer.printStats(deltaTime);
+ }
+ }
+
+ /**
+ * Returns the file name associated with this index manager.
+ *
+ * @return The file name associated with this index manager.
+ */
+ String getBufferFileName()
+ {
+ return bufferFileName;
+ }
+
+ private int getLimit()
+ {
+ return limit;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int compareTo(IndexManager mgr)
+ {
+ return numberOfBuffers - mgr.numberOfBuffers;
+ }
+
+ private int getNumberOfBuffers()
+ {
+ return numberOfBuffers;
+ }
+ }
+
+ /**
+ * The rebuild index manager handles all rebuild index related processing.
+ */
+ private class RebuildIndexManager extends ImportTask implements
+ DiskSpaceMonitorHandler
+ {
+
+ /** Rebuild index configuration. */
+ private final RebuildConfig rebuildConfig;
+
+ /** Local DB backend configuration. */
+ private final LocalDBBackendCfg cfg;
+
+ /** Map of index keys to indexes. */
+ private final Map<IndexKey, Index> indexMap =
+ new LinkedHashMap<IndexKey, Index>();
+
+ /** Map of index keys to extensible indexes. */
+ private final Map<IndexKey, Collection<Index>> extensibleIndexMap =
+ new LinkedHashMap<IndexKey, Collection<Index>>();
+
+ /** List of VLV indexes. */
+ private final List<VLVIndex> vlvIndexes = new LinkedList<VLVIndex>();
+
+ /** The DN2ID index. */
+ private DN2ID dn2id;
+
+ /** The DN2URI index. */
+ private DN2URI dn2uri;
+
+ /** Total entries to be processed. */
+ private long totalEntries;
+
+ /** Total entries processed. */
+ private final AtomicLong entriesProcessed = new AtomicLong(0);
+
+ /** The suffix instance. */
+ private Suffix suffix;
+
+ /** The entry container. */
+ private EntryContainer entryContainer;
+
+ /**
+ * Create an instance of the rebuild index manager using the specified
+ * parameters.
+ *
+ * @param rebuildConfig
+ * The rebuild configuration to use.
+ * @param cfg
+ * The local DB configuration to use.
+ */
+ public RebuildIndexManager(RebuildConfig rebuildConfig,
+ LocalDBBackendCfg cfg)
+ {
+ this.rebuildConfig = rebuildConfig;
+ this.cfg = cfg;
+ }
+
+ /**
+ * Initialize a rebuild index manager.
+ *
+ * @throws ConfigException
+ * If an configuration error occurred.
+ * @throws InitializationException
+ * If an initialization error occurred.
+ */
+ public void initialize() throws ConfigException, InitializationException
+ {
+ entryContainer = rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
+ suffix = new Suffix(entryContainer, null, null, null);
+ if (suffix == null)
+ {
+ throw new InitializationException(
+ ERR_JEB_REBUILD_SUFFIX_ERROR.get(rebuildConfig.getBaseDN()));
+ }
+ }
+
+ /**
+ * Print start message.
+ *
+ * @throws DatabaseException
+ * If an database error occurred.
+ */
+ public void printStartMessage() throws DatabaseException
+ {
+ totalEntries = suffix.getID2Entry().getRecordCount();
+
+ switch (rebuildConfig.getRebuildMode())
+ {
+ case ALL:
+ logger.info(NOTE_JEB_REBUILD_ALL_START, totalEntries);
+ break;
+ case DEGRADED:
+ logger.info(NOTE_JEB_REBUILD_DEGRADED_START, totalEntries);
+ break;
+ default:
+ if (!rebuildConfig.isClearDegradedState()
+ && logger.isInfoEnabled())
+ {
+ String indexes = Utils.joinAsString(", ", rebuildConfig.getRebuildList());
+ logger.info(NOTE_JEB_REBUILD_START, indexes, totalEntries);
+ }
+ break;
+ }
+ }
+
+ /**
+ * Print stop message.
+ *
+ * @param startTime
+ * The time the rebuild started.
+ */
+ public void printStopMessage(long startTime)
+ {
+ long finishTime = System.currentTimeMillis();
+ long totalTime = finishTime - startTime;
+ float rate = 0;
+ if (totalTime > 0)
+ {
+ rate = 1000f * entriesProcessed.get() / totalTime;
+ }
+
+ if (!rebuildConfig.isClearDegradedState())
+ {
+ logger.info(NOTE_JEB_REBUILD_FINAL_STATUS, entriesProcessed.get(), totalTime / 1000, rate);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception
+ {
+ ID2Entry id2entry = entryContainer.getID2Entry();
+ DiskOrderedCursor cursor =
+ id2entry.openCursor(DiskOrderedCursorConfig.DEFAULT);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ try
+ {
+ while (cursor.getNext(key, data, null) == OperationStatus.SUCCESS)
+ {
+ if (isCanceled)
+ {
+ return null;
+ }
+ EntryID entryID = new EntryID(key);
+ Entry entry =
+ ID2Entry.entryFromDatabase(ByteString.wrap(data.getData()),
+ entryContainer.getRootContainer().getCompressedSchema());
+ processEntry(entry, entryID);
+ entriesProcessed.getAndIncrement();
+ }
+ flushIndexBuffers();
+ }
+ catch (Exception e)
+ {
+ logger.traceException(e);
+ logger.error(ERR_JEB_IMPORT_LDIF_REBUILD_INDEX_TASK_ERR, stackTraceToSingleLineString(e));
+ isCanceled = true;
+ throw e;
+ }
+ finally
+ {
+ close(cursor);
+ }
+ return null;
+ }
+
+ /**
+ * Perform rebuild index processing.
+ *
+ * @throws DatabaseException
+ * If an database error occurred.
+ * @throws InterruptedException
+ * If an interrupted error occurred.
+ * @throws ExecutionException
+ * If an Execution error occurred.
+ * @throws JebException
+ * If an JEB error occurred.
+ */
+ public void rebuildIndexes() throws DatabaseException,
+ InterruptedException, ExecutionException, JebException
+ {
+ // Sets only the needed indexes.
+ setIndexesListsToBeRebuilt();
+
+ if (!rebuildConfig.isClearDegradedState())
+ {
+ // If not in a 'clear degraded state' operation,
+ // need to rebuild the indexes.
+ setRebuildListIndexesTrusted(false);
+ clearIndexes(true);
+ phaseOne();
+ if (isCanceled)
+ {
+ throw new InterruptedException("Rebuild Index canceled.");
+ }
+ phaseTwo();
+ }
+ else
+ {
+ logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
+ }
+
+ setRebuildListIndexesTrusted(true);
+ }
+
+ @SuppressWarnings("fallthrough")
+ private void setIndexesListsToBeRebuilt() throws JebException
+ {
+ // Depends on rebuild mode, (re)building indexes' lists.
+ final RebuildMode mode = rebuildConfig.getRebuildMode();
+ switch (mode)
+ {
+ case ALL:
+ rebuildIndexMap(false);
+ // falls through
+ case DEGRADED:
+ if (mode == RebuildMode.ALL
+ || !entryContainer.getID2Children().isTrusted()
+ || !entryContainer.getID2Subtree().isTrusted())
+ {
+ dn2id = entryContainer.getDN2ID();
+ }
+ if (mode == RebuildMode.ALL || entryContainer.getDN2URI() == null)
+ {
+ dn2uri = entryContainer.getDN2URI();
+ }
+ if (mode == RebuildMode.DEGRADED
+ || entryContainer.getAttributeIndexes().isEmpty())
+ {
+ rebuildIndexMap(true); // only degraded.
+ }
+ if (mode == RebuildMode.ALL || vlvIndexes.isEmpty())
+ {
+ vlvIndexes.addAll(new LinkedList<VLVIndex>(entryContainer.getVLVIndexes()));
+ }
+ break;
+
+ case USER_DEFINED:
+ // false may be required if the user wants to rebuild specific index.
+ rebuildIndexMap(false);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void rebuildIndexMap(final boolean onlyDegraded)
+ {
+ // rebuildList contains the user-selected index(in USER_DEFINED mode).
+ final List<String> rebuildList = rebuildConfig.getRebuildList();
+ for (final Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
+ {
+ final AttributeType attributeType = mapEntry.getKey();
+ final AttributeIndex attributeIndex = mapEntry.getValue();
+ if (rebuildConfig.getRebuildMode() == RebuildMode.ALL
+ || rebuildConfig.getRebuildMode() == RebuildMode.DEGRADED)
+ {
+ // Get all existing indexes for all && degraded mode.
+ rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
+ }
+ else if (!rebuildList.isEmpty())
+ {
+ // Get indexes for user defined index.
+ for (final String index : rebuildList)
+ {
+ if (attributeType.getNameOrOID().toLowerCase().equals(index.toLowerCase()))
+ {
+ rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
+ }
+ }
+ }
+ }
+ }
+
+ private void rebuildAttributeIndexes(final AttributeIndex attrIndex,
+ final AttributeType attrType, final boolean onlyDegraded)
+ throws DatabaseException
+ {
+ fillIndexMap(attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
+ fillIndexMap(attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
+ fillIndexMap(attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
+ fillIndexMap(attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
+ fillIndexMap(attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
+
+ final Map<String, Collection<Index>> extensibleMap = attrIndex.getExtensibleIndexes();
+ if (!extensibleMap.isEmpty())
+ {
+ final Collection<Index> subIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING);
+ fillIndexMap(attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
+ final Collection<Index> sharedIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED);
+ fillIndexMap(attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
+ }
+ }
+
+ private void fillIndexMap(final AttributeType attrType, final Collection<Index> indexes,
+ final ImportIndexType importIndexType, final boolean onlyDegraded)
+ {
+ if (indexes != null && !indexes.isEmpty())
+ {
+ final List<Index> mutableCopy = new LinkedList<Index>(indexes);
+ for (final Iterator<Index> it = mutableCopy.iterator(); it.hasNext();)
+ {
+ final Index sharedIndex = it.next();
+ if (!onlyDegraded || !sharedIndex.isTrusted())
+ {
+ if (!rebuildConfig.isClearDegradedState() || sharedIndex.getRecordCount() == 0)
+ {
+ putInIdContainerMap(sharedIndex);
+ }
+ }
+ else
+ {
+ // This index is not a candidate for rebuilding.
+ it.remove();
+ }
+ }
+ if (!mutableCopy.isEmpty())
+ {
+ extensibleIndexMap.put(new IndexKey(attrType, importIndexType, 0), mutableCopy);
+ }
+ }
+ }
+
+ private void fillIndexMap(final AttributeType attrType, final Index partialAttrIndex,
+ final ImportIndexType importIndexType, final boolean onlyDegraded)
+ {
+ if (partialAttrIndex != null
+ && (!onlyDegraded || !partialAttrIndex.isTrusted())
+ && (!rebuildConfig.isClearDegradedState() || partialAttrIndex.getRecordCount() == 0))
+ {
+ putInIdContainerMap(partialAttrIndex);
+ final IndexKey indexKey = new IndexKey(attrType, importIndexType, partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+ }
+ }
+
+ private void clearIndexes(boolean onlyDegraded) throws DatabaseException
+ {
+ // Clears all the entry's container databases which are containing the indexes
+ if (!onlyDegraded)
+ {
+ // dn2uri does not have a trusted status.
+ entryContainer.clearDatabase(entryContainer.getDN2URI());
+ }
+
+ if (!onlyDegraded
+ || !entryContainer.getID2Children().isTrusted()
+ || !entryContainer.getID2Subtree().isTrusted())
+ {
+ entryContainer.clearDatabase(entryContainer.getDN2ID());
+ entryContainer.clearDatabase(entryContainer.getID2Children());
+ entryContainer.clearDatabase(entryContainer.getID2Subtree());
+ }
+
+ if (!indexMap.isEmpty())
+ {
+ for (final Map.Entry<IndexKey, Index> mapEntry : indexMap.entrySet())
+ {
+ if (!onlyDegraded || !mapEntry.getValue().isTrusted())
+ {
+ entryContainer.clearDatabase(mapEntry.getValue());
+ }
+ }
+ }
+
+ if (!extensibleIndexMap.isEmpty())
+ {
+ for (final Collection<Index> subIndexes : extensibleIndexMap.values())
+ {
+ if (subIndexes != null)
+ {
+ for (final Index subIndex : subIndexes)
+ {
+ entryContainer.clearDatabase(subIndex);
+ }
+ }
+ }
+ }
+
+ for (final VLVIndex vlvIndex : entryContainer.getVLVIndexes())
+ {
+ if (!onlyDegraded || !vlvIndex.isTrusted())
+ {
+ entryContainer.clearDatabase(vlvIndex);
+ }
+ }
+ }
+
+ private void setRebuildListIndexesTrusted(boolean trusted)
+ throws JebException
+ {
+ try
+ {
+ if (dn2id != null)
+ {
+ EntryContainer ec = suffix.getEntryContainer();
+ ec.getID2Children().setTrusted(null, trusted);
+ ec.getID2Subtree().setTrusted(null, trusted);
+ }
+ if (!indexMap.isEmpty())
+ {
+ for (Map.Entry<IndexKey, Index> mapEntry : indexMap.entrySet())
+ {
+ Index index = mapEntry.getValue();
+ index.setTrusted(null, trusted);
+ }
+ }
+ if (!vlvIndexes.isEmpty())
+ {
+ for (VLVIndex vlvIndex : vlvIndexes)
+ {
+ vlvIndex.setTrusted(null, trusted);
+ }
+ }
+ if (!extensibleIndexMap.isEmpty())
+ {
+ for (Collection<Index> subIndexes : extensibleIndexMap.values())
+ {
+ if (subIndexes != null)
+ {
+ for (Index subIndex : subIndexes)
+ {
+ subIndex.setTrusted(null, trusted);
+ }
+ }
+ }
+ }
+ }
+ catch (DatabaseException ex)
+ {
+ throw new JebException(NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()));
+ }
+ }
+
+ private void phaseOne() throws DatabaseException, InterruptedException,
+ ExecutionException
+ {
+ initializeIndexBuffers();
+ Timer timer = scheduleAtFixedRate(new RebuildFirstPhaseProgressTask());
+ scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
+ bufferSortService = Executors.newFixedThreadPool(threadCount);
+ ExecutorService rebuildIndexService = Executors.newFixedThreadPool(threadCount);
+ List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
+ for (int i = 0; i < threadCount; i++)
+ {
+ tasks.add(this);
+ }
+ List<Future<Void>> results = rebuildIndexService.invokeAll(tasks);
+ getAll(results);
+ stopScratchFileWriters();
+ getAll(scratchFileWriterFutures);
+
+ // Try to clear as much memory as possible.
+ shutdownAll(rebuildIndexService, bufferSortService, scratchFileWriterService);
+ timer.cancel();
+
+ clearAll(tasks, results, scratchFileWriterList, scratchFileWriterFutures, freeBufferQueue);
+ indexKeyQueueMap.clear();
+ }
+
+ private void phaseTwo() throws InterruptedException, ExecutionException
+ {
+ final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
+ try
+ {
+ processIndexFiles();
+ }
+ finally
+ {
+ timer.cancel();
+ }
+ }
+
+ private Timer scheduleAtFixedRate(TimerTask task)
+ {
+ final Timer timer = new Timer();
+ timer.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL);
+ return timer;
+ }
+
+ private int getIndexCount() throws ConfigException, JebException,
+ InitializationException
+ {
+ switch (rebuildConfig.getRebuildMode())
+ {
+ case ALL:
+ return getTotalIndexCount(cfg);
+ case DEGRADED:
+ // FIXME: since the environment is not started we cannot determine which
+ // indexes are degraded. As a workaround, be conservative and assume all
+ // indexes need rebuilding.
+ return getTotalIndexCount(cfg);
+ default:
+ return getRebuildListIndexCount(cfg);
+ }
+ }
+
+ private int getRebuildListIndexCount(LocalDBBackendCfg cfg)
+ throws JebException, ConfigException, InitializationException
+ {
+ final List<String> rebuildList = rebuildConfig.getRebuildList();
+ if (rebuildList.isEmpty())
+ {
+ return 0;
+ }
+
+ int indexCount = 0;
+ for (String index : rebuildList)
+ {
+ final String lowerName = index.toLowerCase();
+ if ("dn2id".equals(lowerName))
+ {
+ indexCount += 3;
+ }
+ else if ("dn2uri".equals(lowerName))
+ {
+ indexCount++;
+ }
+ else if (lowerName.startsWith("vlv."))
+ {
+ if (lowerName.length() < 5)
+ {
+ throw new JebException(ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName));
+ }
+ indexCount++;
+ }
+ else if ("id2subtree".equals(lowerName)
+ || "id2children".equals(lowerName))
+ {
+ throw attributeIndexNotConfigured(index);
+ }
+ else
+ {
+ final String[] attrIndexParts = lowerName.split("\\.");
+ if (attrIndexParts.length <= 0 || attrIndexParts.length > 3)
+ {
+ throw attributeIndexNotConfigured(index);
+ }
+ AttributeType attrType = DirectoryServer.getAttributeType(attrIndexParts[0]);
+ if (attrType == null)
+ {
+ throw attributeIndexNotConfigured(index);
+ }
+ if (attrIndexParts.length != 1)
+ {
+ final String indexType = attrIndexParts[1];
+ if (attrIndexParts.length == 2)
+ {
+ if ("presence".equals(indexType)
+ || "equality".equals(indexType)
+ || "ordering".equals(indexType)
+ || "substring".equals(indexType)
+ || "approximate".equals(indexType))
+ {
+ indexCount++;
+ }
+ else
+ {
+ throw attributeIndexNotConfigured(index);
+ }
+ }
+ else // attrIndexParts.length == 3
+ {
+ if (!findExtensibleMatchingRule(cfg, indexType + "." + attrIndexParts[2]))
+ {
+ throw attributeIndexNotConfigured(index);
+ }
+ indexCount++;
+ }
+ }
+ else
+ {
+ boolean found = false;
+ for (final String idx : cfg.listLocalDBIndexes())
+ {
+ if (idx.equalsIgnoreCase(index))
+ {
+ found = true;
+ final LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
+ indexCount += getAttributeIndexCount(indexCfg.getIndexType(),
+ PRESENCE, EQUALITY, ORDERING, SUBSTRING, APPROXIMATE);
+ indexCount += getExtensibleIndexCount(indexCfg);
+ }
+ }
+ if (!found)
+ {
+ throw attributeIndexNotConfigured(index);
+ }
+ }
+ }
+ }
+ return indexCount;
+ }
+
+ private InitializationException attributeIndexNotConfigured(String index)
+ {
+ return new InitializationException(ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index));
+ }
+
+ private boolean findExtensibleMatchingRule(LocalDBBackendCfg cfg, String indexExRuleName) throws ConfigException
+ {
+ for (String idx : cfg.listLocalDBIndexes())
+ {
+ LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
+ if (indexCfg.getIndexType().contains(EXTENSIBLE))
+ {
+ for (String exRule : indexCfg.getIndexExtensibleMatchingRule())
+ {
+ if (exRule.equalsIgnoreCase(indexExRuleName))
+ {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private int getAttributeIndexCount(SortedSet<IndexType> indexTypes, IndexType... toFinds)
+ {
+ int result = 0;
+ for (IndexType toFind : toFinds)
+ {
+ if (indexTypes.contains(toFind))
+ {
+ result++;
+ }
+ }
+ return result;
+ }
+
+ private int getExtensibleIndexCount(LocalDBIndexCfg indexCfg)
+ {
+ int result = 0;
+ if (indexCfg.getIndexType().contains(EXTENSIBLE))
+ {
+ boolean shared = false;
+ for (final String exRule : indexCfg.getIndexExtensibleMatchingRule())
+ {
+ if (exRule.endsWith(".sub"))
+ {
+ result++;
+ }
+ else if (!shared)
+ {
+ shared = true;
+ result++;
+ }
+ }
+ }
+ return result;
+ }
+
+ private void processEntry(Entry entry, EntryID entryID)
+ throws DatabaseException, DirectoryException, JebException,
+ InterruptedException
+ {
+ if (dn2id != null)
+ {
+ processDN2ID(suffix, entry.getName(), entryID);
+ }
+ if (dn2uri != null)
+ {
+ processDN2URI(suffix, null, entry);
+ }
+ processIndexes(entry, entryID);
+ processExtensibleIndexes(entry, entryID);
+ processVLVIndexes(entry, entryID);
+ }
+
+ private void processVLVIndexes(Entry entry, EntryID entryID)
+ throws DatabaseException, JebException, DirectoryException
+ {
+ for (VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes())
+ {
+ Transaction transaction = null;
+ vlvIdx.addEntry(transaction, entryID, entry);
+ }
+ }
+
+ private void processExtensibleIndexes(Entry entry, EntryID entryID)
+ throws InterruptedException
+ {
+ for (Map.Entry<IndexKey, Collection<Index>> mapEntry :
+ this.extensibleIndexMap.entrySet())
+ {
+ IndexKey key = mapEntry.getKey();
+ AttributeType attrType = key.getAttributeType();
+ if (entry.hasAttribute(attrType))
+ {
+ AttributeIndex attributeIndex = entryContainer.getAttributeIndex(attrType);
+ IndexingOptions options = attributeIndex.getIndexingOptions();
+ for (Index index : mapEntry.getValue())
+ {
+ processAttribute(index, entry, entryID, options, key);
+ }
+ }
+ }
+ }
+
+ private void processIndexes(Entry entry, EntryID entryID)
+ throws DatabaseException, InterruptedException
+ {
+ for (Map.Entry<IndexKey, Index> mapEntry : indexMap.entrySet())
+ {
+ IndexKey key = mapEntry.getKey();
+ AttributeType attrType = key.getAttributeType();
+ if (entry.hasAttribute(attrType))
+ {
+ AttributeIndex attributeIndex = entryContainer.getAttributeIndex(attrType);
+ IndexingOptions options = attributeIndex.getIndexingOptions();
+ Index index = mapEntry.getValue();
+ processAttribute(index, entry, entryID, options,
+ new IndexKey(attrType, key.getIndexType(), index.getIndexEntryLimit()));
+ }
+ }
+ }
+
+ /**
+ * Return the number of entries processed by the rebuild manager.
+ *
+ * @return The number of entries processed.
+ */
+ public long getEntriesProcess()
+ {
+ return this.entriesProcessed.get();
+ }
+
+ /**
+ * Return the total number of entries to process by the rebuild manager.
+ *
+ * @return The total number for entries to process.
+ */
+ public long getTotalEntries()
+ {
+ return this.totalEntries;
+ }
+
+ @Override
+ public void diskLowThresholdReached(DiskSpaceMonitor monitor)
+ {
+ diskFullThresholdReached(monitor);
+ }
+
+ @Override
+ public void diskFullThresholdReached(DiskSpaceMonitor monitor)
+ {
+ isCanceled = true;
+ logger.error(ERR_REBUILD_INDEX_LACK_DISK, monitor.getDirectory().getPath(),
+ monitor.getFreeSpace(), monitor.getLowThreshold());
+ }
+
+ @Override
+ public void diskSpaceRestored(DiskSpaceMonitor monitor)
+ {
+ // Do nothing
+ }
+ }
+
+ /**
+ * This class reports progress of rebuild index processing at fixed intervals.
+ */
+ private class RebuildFirstPhaseProgressTask extends TimerTask
+ {
+ /**
+ * The number of records that had been processed at the time of the previous
+ * progress report.
+ */
+ private long previousProcessed;
+ /** The time in milliseconds of the previous progress report. */
+ private long previousTime;
+ /** The environment statistics at the time of the previous report. */
+ private EnvironmentStats prevEnvStats;
+
+ /**
+ * Create a new rebuild index progress task.
+ *
+ * @throws DatabaseException
+ * If an error occurred while accessing the JE database.
+ */
+ public RebuildFirstPhaseProgressTask() throws DatabaseException
+ {
+ previousTime = System.currentTimeMillis();
+ prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
+ }
+
+ /**
+ * The action to be performed by this timer task.
+ */
+ @Override
+ public void run()
+ {
+ long latestTime = System.currentTimeMillis();
+ long deltaTime = latestTime - previousTime;
+
+ if (deltaTime == 0)
+ {
+ return;
+ }
+ long entriesProcessed = rebuildManager.getEntriesProcess();
+ long deltaCount = entriesProcessed - previousProcessed;
+ float rate = 1000f * deltaCount / deltaTime;
+ float completed = 0;
+ if (rebuildManager.getTotalEntries() > 0)
+ {
+ completed = 100f * entriesProcessed / rebuildManager.getTotalEntries();
+ }
+ logger.info(NOTE_JEB_REBUILD_PROGRESS_REPORT, completed, entriesProcessed,
+ rebuildManager.getTotalEntries(), rate);
+ try
+ {
+ Runtime runtime = Runtime.getRuntime();
+ long freeMemory = runtime.freeMemory() / MB;
+ EnvironmentStats envStats =
+ rootContainer.getEnvironmentStats(new StatsConfig());
+ long nCacheMiss = envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
+
+ float cacheMissRate = 0;
+ if (deltaCount > 0)
+ {
+ cacheMissRate = nCacheMiss / (float) deltaCount;
+ }
+ logger.info(NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT, freeMemory, cacheMissRate);
+ prevEnvStats = envStats;
+ }
+ catch (DatabaseException e)
+ {
+ // Unlikely to happen and not critical.
+ }
+ previousProcessed = entriesProcessed;
+ previousTime = latestTime;
+ }
+ }
+
+ /**
+ * This class reports progress of first phase of import processing at fixed
+ * intervals.
+ */
+ private final class FirstPhaseProgressTask extends TimerTask
+ {
+ /**
+ * The number of entries that had been read at the time of the previous
+ * progress report.
+ */
+ private long previousCount;
+ /** The time in milliseconds of the previous progress report. */
+ private long previousTime;
+ /** The environment statistics at the time of the previous report. */
+ private EnvironmentStats previousStats;
+ /** Determines if eviction has been detected. */
+ private boolean evicting;
+ /** Entry count when eviction was detected. */
+ private long evictionEntryCount;
+
+ /** Create a new import progress task. */
+ public FirstPhaseProgressTask()
+ {
+ previousTime = System.currentTimeMillis();
+ try
+ {
+ previousStats = rootContainer.getEnvironmentStats(new StatsConfig());
+ }
+ catch (DatabaseException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** The action to be performed by this timer task. */
+ @Override
+ public void run()
+ {
+ long latestCount = reader.getEntriesRead() + 0;
+ long deltaCount = latestCount - previousCount;
+ long latestTime = System.currentTimeMillis();
+ long deltaTime = latestTime - previousTime;
+ if (deltaTime == 0)
+ {
+ return;
+ }
+ long entriesRead = reader.getEntriesRead();
+ long entriesIgnored = reader.getEntriesIgnored();
+ long entriesRejected = reader.getEntriesRejected();
+ float rate = 1000f * deltaCount / deltaTime;
+ logger.info(NOTE_JEB_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
+ try
+ {
+ Runtime runTime = Runtime.getRuntime();
+ long freeMemory = runTime.freeMemory() / MB;
+ EnvironmentStats environmentStats;
+
+ //If first phase skip DN validation is specified use the root container
+ //stats, else use the temporary environment stats.
+ if (skipDNValidation)
+ {
+ environmentStats =
+ rootContainer.getEnvironmentStats(new StatsConfig());
+ }
+ else
+ {
+ environmentStats = tmpEnv.getEnvironmentStats(new StatsConfig());
+ }
+ long nCacheMiss =
+ environmentStats.getNCacheMiss() - previousStats.getNCacheMiss();
+
+ float cacheMissRate = 0;
+ if (deltaCount > 0)
+ {
+ cacheMissRate = nCacheMiss / (float) deltaCount;
+ }
+ logger.info(NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT, freeMemory, cacheMissRate);
+ long evictPasses = environmentStats.getNEvictPasses();
+ long evictNodes = environmentStats.getNNodesExplicitlyEvicted();
+ long evictBinsStrip = environmentStats.getNBINsStripped();
+ long cleanerRuns = environmentStats.getNCleanerRuns();
+ long cleanerDeletions = environmentStats.getNCleanerDeletions();
+ long cleanerEntriesRead = environmentStats.getNCleanerEntriesRead();
+ long cleanerINCleaned = environmentStats.getNINsCleaned();
+ long checkPoints = environmentStats.getNCheckpoints();
+ if (evictPasses != 0)
+ {
+ if (!evicting)
+ {
+ evicting = true;
+ evictionEntryCount = reader.getEntriesRead();
+ logger.info(NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED, evictionEntryCount);
+ }
+ logger.info(NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS, evictPasses,
+ evictNodes, evictBinsStrip);
+ }
+ if (cleanerRuns != 0)
+ {
+ logger.info(NOTE_JEB_IMPORT_LDIF_CLEANER_STATS, cleanerRuns,
+ cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
+ }
+ if (checkPoints > 1)
+ {
+ logger.info(NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS, checkPoints);
+ }
+ previousStats = environmentStats;
+ }
+ catch (DatabaseException e)
+ {
+ // Unlikely to happen and not critical.
+ }
+ previousCount = latestCount;
+ previousTime = latestTime;
+ }
+ }
+
+ /**
+ * This class reports progress of the second phase of import processing at
+ * fixed intervals.
+ */
+ private class SecondPhaseProgressTask extends TimerTask
+ {
+ /**
+ * The number of entries that had been read at the time of the previous
+ * progress report.
+ */
+ private long previousCount;
+ /** The time in milliseconds of the previous progress report. */
+ private long previousTime;
+ /** The environment statistics at the time of the previous report. */
+ private EnvironmentStats previousStats;
+ /** Determines if eviction has been detected. */
+ private boolean evicting;
+ private long latestCount;
+
+ /**
+ * Create a new import progress task.
+ *
+ * @param latestCount
+ * The latest count of entries processed in phase one.
+ */
+ public SecondPhaseProgressTask(long latestCount)
+ {
+ previousTime = System.currentTimeMillis();
+ this.latestCount = latestCount;
+ try
+ {
+ previousStats = rootContainer.getEnvironmentStats(new StatsConfig());
+ }
+ catch (DatabaseException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** The action to be performed by this timer task. */
+ @Override
+ public void run()
+ {
+ long deltaCount = latestCount - previousCount;
+ long latestTime = System.currentTimeMillis();
+ long deltaTime = latestTime - previousTime;
+ if (deltaTime == 0)
+ {
+ return;
+ }
+ try
+ {
+ Runtime runTime = Runtime.getRuntime();
+ long freeMemory = runTime.freeMemory() / MB;
+ EnvironmentStats environmentStats =
+ rootContainer.getEnvironmentStats(new StatsConfig());
+ long nCacheMiss =
+ environmentStats.getNCacheMiss() - previousStats.getNCacheMiss();
+
+ float cacheMissRate = 0;
+ if (deltaCount > 0)
+ {
+ cacheMissRate = nCacheMiss / (float) deltaCount;
+ }
+ logger.info(NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT, freeMemory, cacheMissRate);
+ long evictPasses = environmentStats.getNEvictPasses();
+ long evictNodes = environmentStats.getNNodesExplicitlyEvicted();
+ long evictBinsStrip = environmentStats.getNBINsStripped();
+ long cleanerRuns = environmentStats.getNCleanerRuns();
+ long cleanerDeletions = environmentStats.getNCleanerDeletions();
+ long cleanerEntriesRead = environmentStats.getNCleanerEntriesRead();
+ long cleanerINCleaned = environmentStats.getNINsCleaned();
+ long checkPoints = environmentStats.getNCheckpoints();
+ if (evictPasses != 0)
+ {
+ if (!evicting)
+ {
+ evicting = true;
+ }
+ logger.info(NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS, evictPasses,
+ evictNodes, evictBinsStrip);
+ }
+ if (cleanerRuns != 0)
+ {
+ logger.info(NOTE_JEB_IMPORT_LDIF_CLEANER_STATS, cleanerRuns,
+ cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
+ }
+ if (checkPoints > 1)
+ {
+ logger.info(NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS, checkPoints);
+ }
+ previousStats = environmentStats;
+ }
+ catch (DatabaseException e)
+ {
+ // Unlikely to happen and not critical.
+ }
+ previousCount = latestCount;
+ previousTime = latestTime;
+
+ //Do DN index managers first.
+ for (IndexManager indexMgrDN : DNIndexMgrList)
+ {
+ indexMgrDN.printStats(deltaTime);
+ }
+ //Do non-DN index managers.
+ for (IndexManager indexMgr : indexMgrList)
+ {
+ indexMgr.printStats(deltaTime);
+ }
+ }
+ }
+
+ /**
+ * A class to hold information about the entry determined by the LDIF reader.
+ * Mainly the suffix the entry belongs under and the ID assigned to it by the
+ * reader.
+ */
+ public class EntryInformation
+ {
+ private EntryID entryID;
+ private Suffix suffix;
+
+ /**
+ * Return the suffix associated with the entry.
+ *
+ * @return Entry's suffix instance;
+ */
+ public Suffix getSuffix()
+ {
+ return suffix;
+ }
+
+ /**
+ * Set the suffix instance associated with the entry.
+ *
+ * @param suffix
+ * The suffix associated with the entry.
+ */
+ public void setSuffix(Suffix suffix)
+ {
+ this.suffix = suffix;
+ }
+
+ /**
+ * Set the entry's ID.
+ *
+ * @param entryID
+ * The entry ID to set the entry ID to.
+ */
+ public void setEntryID(EntryID entryID)
+ {
+ this.entryID = entryID;
+ }
+
+ /**
+ * Return the entry ID associated with the entry.
+ *
+ * @return The entry ID associated with the entry.
+ */
+ public EntryID getEntryID()
+ {
+ return entryID;
+ }
+ }
+
+ /**
+ * This class defines the individual index type available.
+ */
+ private enum ImportIndexType
+ {
+ /** The DN index type. */
+ DN,
+ /** The equality index type. */
+ EQUALITY,
+ /** The presence index type. */
+ PRESENCE,
+ /** The sub-string index type. */
+ SUBSTRING,
+ /** The ordering index type. */
+ ORDERING,
+ /** The approximate index type. */
+ APPROXIMATE,
+ /** The extensible sub-string index type. */
+ EX_SUBSTRING,
+ /** The extensible shared index type. */
+ EX_SHARED,
+ /** The vlv index type. */
+ VLV
+ }
+
+ /**
+ * This class is used as an index key for hash maps that need to process
+ * multiple suffix index elements into a single queue and/or maps based on
+ * both attribute type and index type (ie., cn.equality, sn.equality,...).
+ */
+ public class IndexKey
+ {
+
+ private final AttributeType attributeType;
+ private final ImportIndexType indexType;
+ private final int entryLimit;
+
+ /**
+ * Create index key instance using the specified attribute type, index type
+ * and index entry limit.
+ *
+ * @param attributeType
+ * The attribute type.
+ * @param indexType
+ * The index type.
+ * @param entryLimit
+ * The entry limit for the index.
+ */
+ private IndexKey(AttributeType attributeType, ImportIndexType indexType, int entryLimit)
+ {
+ this.attributeType = attributeType;
+ this.indexType = indexType;
+ this.entryLimit = entryLimit;
+ }
+
+ /**
+ * An equals method that uses both the attribute type and the index type.
+ * Only returns {@code true} if the attribute type and index type are equal.
+ *
+ * @param obj
+ * the object to compare.
+ * @return {@code true} if the objects are equal, or {@code false} if they
+ * are not.
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof IndexKey)
+ {
+ IndexKey oKey = (IndexKey) obj;
+ if (attributeType.equals(oKey.getAttributeType())
+ && indexType.equals(oKey.getIndexType()))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * A hash code method that adds the hash codes of the attribute type and
+ * index type and returns that value.
+ *
+ * @return The combined hash values of attribute type hash code and the
+ * index type hash code.
+ */
+ @Override
+ public int hashCode()
+ {
+ return attributeType.hashCode() + indexType.hashCode();
+ }
+
+ /**
+ * Return the attribute type.
+ *
+ * @return The attribute type.
+ */
+ public AttributeType getAttributeType()
+ {
+ return attributeType;
+ }
+
+ /**
+ * Return the index type.
+ *
+ * @return The index type.
+ */
+ public ImportIndexType getIndexType()
+ {
+ return indexType;
+ }
+
+ /**
+ * Return the index key name, which is the attribute type primary name, a
+ * period, and the index type name. Used for building file names and
+ * progress output.
+ *
+ * @return The index key name.
+ */
+ public String getName()
+ {
+ return attributeType.getPrimaryName() + "."
+ + StaticUtils.toLowerCase(indexType.name());
+ }
+
+ /**
+ * Return the entry limit associated with the index.
+ *
+ * @return The entry limit.
+ */
+ public int getEntryLimit()
+ {
+ return entryLimit;
+ }
+ }
+
+ /**
+ * The temporary environment will be shared when multiple suffixes are being
+ * processed. This interface is used by those suffix instance to do parental
+ * checking of the DN cache.
+ */
+ public static interface DNCache
+ {
+
+ /**
+ * Returns {@code true} if the specified DN is contained in the DN cache, or
+ * {@code false} otherwise.
+ *
+ * @param dn
+ * The DN to check the presence of.
+ * @return {@code true} if the cache contains the DN, or {@code false} if it
+ * is not.
+ * @throws DatabaseException
+ * If an error occurs reading the database.
+ */
+ boolean contains(DN dn) throws DatabaseException;
+ }
+
+ /**
+ * Temporary environment used to check DN's when DN validation is performed
+ * during phase one processing. It is deleted after phase one processing.
+ */
+ private final class TmpEnv implements DNCache
+ {
+ private final String envPath;
+ private final Environment environment;
+ private static final String DB_NAME = "dn_cache";
+ private final Database dnCache;
+
+ /**
+ * Create a temporary DB environment and database to be used as a cache of
+ * DNs when DN validation is performed in phase one processing.
+ *
+ * @param envPath
+ * The file path to create the environment under.
+ * @throws DatabaseException
+ * If an error occurs either creating the environment or the DN
+ * database.
+ */
+ private TmpEnv(File envPath) throws DatabaseException
+ {
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setConfigParam(ENV_RUN_CLEANER, "true");
+ envConfig.setReadOnly(false);
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(false);
+ envConfig.setConfigParam(ENV_IS_LOCKING, "true");
+ envConfig.setConfigParam(ENV_RUN_CHECKPOINTER, "false");
+ envConfig.setConfigParam(EVICTOR_LRU_ONLY, "false");
+ envConfig.setConfigParam(EVICTOR_NODES_PER_SCAN, "128");
+ envConfig.setConfigParam(MAX_MEMORY, Long.toString(tmpEnvCacheSize));
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setAllowCreate(true);
+ dbConfig.setTransactional(false);
+ dbConfig.setTemporary(true);
+ environment = new Environment(envPath, envConfig);
+ dnCache = environment.openDatabase(null, DB_NAME, dbConfig);
+ this.envPath = envPath.getPath();
+ }
+
+ private static final long FNV_INIT = 0xcbf29ce484222325L;
+ private static final long FNV_PRIME = 0x100000001b3L;
+
+ /** Hash the DN bytes. Uses the FNV-1a hash. */
+ private byte[] hashCode(byte[] b)
+ {
+ long hash = FNV_INIT;
+ for (byte aB : b)
+ {
+ hash ^= aB;
+ hash *= FNV_PRIME;
+ }
+ return JebFormat.entryIDToDatabase(hash);
+ }
+
+ /**
+ * Shutdown the temporary environment.
+ *
+ * @throws JebException
+ * If error occurs.
+ */
+ private void shutdown() throws JebException
+ {
+ dnCache.close();
+ environment.close();
+ EnvManager.removeFiles(envPath);
+ }
+
+ /**
+ * Insert the specified DN into the DN cache. It will return {@code true} if
+ * the DN does not already exist in the cache and was inserted, or
+ * {@code false} if the DN exists already in the cache.
+ *
+ * @param dn
+ * The DN to insert in the cache.
+ * @param val
+ * A database entry to use in the insert.
+ * @param key
+ * A database entry to use in the insert.
+ * @return {@code true} if the DN was inserted in the cache, or
+ * {@code false} if the DN exists in the cache already and could not
+ * be inserted.
+ * @throws JebException
+ * If an error occurs accessing the database.
+ */
+ private boolean insert(DN dn, DatabaseEntry val, DatabaseEntry key)
+ throws JebException
+ {
+ // Use a compact representation for key
+ byte[] dnBytesForKey = dn.toIrreversibleNormalizedByteString().toByteArray();
+ key.setData(hashCode(dnBytesForKey));
+
+ // Use a reversible representation for value
+ byte[] dnBytesForValue = StaticUtils.getBytes(dn.toString());
+ int len = PackedInteger.getWriteIntLength(dnBytesForValue.length);
+ byte[] dataBytes = new byte[dnBytesForValue.length + len];
+ int pos = PackedInteger.writeInt(dataBytes, 0, dnBytesForValue.length);
+ System.arraycopy(dnBytesForValue, 0, dataBytes, pos, dnBytesForValue.length);
+ val.setData(dataBytes);
+
+ return insert(key, val, dnBytesForValue);
+ }
+
+ private boolean insert(DatabaseEntry key, DatabaseEntry val, byte[] dnBytesForValue)
+ throws JebException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = dnCache.openCursor(null, CursorConfig.DEFAULT);
+ OperationStatus status = cursor.putNoOverwrite(key, val);
+ if (status == OperationStatus.KEYEXIST)
+ {
+ DatabaseEntry dns = new DatabaseEntry();
+ status = cursor.getSearchKey(key, dns, LockMode.RMW);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new JebException(LocalizableMessage.raw("Search DN cache failed."));
+ }
+ if (!isDNMatched(dns.getData(), dnBytesForValue))
+ {
+ addDN(dns.getData(), cursor, dnBytesForValue);
+ return true;
+ }
+ return false;
+ }
+ return true;
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
+ /** Add the DN to the DNs as because of a hash collision. */
+ private void addDN(byte[] readDnBytes, Cursor cursor, byte[] dnBytesForValue) throws JebException
+ {
+ int pLen = PackedInteger.getWriteIntLength(dnBytesForValue.length);
+ int totLen = readDnBytes.length + pLen + dnBytesForValue.length;
+ byte[] newRec = new byte[totLen];
+ System.arraycopy(readDnBytes, 0, newRec, 0, readDnBytes.length);
+ int pos = PackedInteger.writeInt(newRec, readDnBytes.length, dnBytesForValue.length);
+ System.arraycopy(dnBytesForValue, 0, newRec, pos, dnBytesForValue.length);
+ DatabaseEntry newVal = new DatabaseEntry(newRec);
+ OperationStatus status = cursor.putCurrent(newVal);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new JebException(LocalizableMessage.raw("Add of DN to DN cache failed."));
+ }
+ }
+
+ /** Return true if the specified DN is in the DNs saved as a result of hash collisions. */
+ private boolean isDNMatched(byte[] readDnBytes, byte[] dnBytes)
+ {
+ int pos = 0;
+ while (pos < readDnBytes.length)
+ {
+ int pLen = PackedInteger.getReadIntLength(readDnBytes, pos);
+ int len = PackedInteger.readInt(readDnBytes, pos);
+ if (indexComparator.compare(readDnBytes, pos + pLen, len, dnBytes, dnBytes.length) == 0)
+ {
+ return true;
+ }
+ pos += pLen + len;
+ }
+ return false;
+ }
+
+ /**
+ * Check if the specified DN is contained in the temporary DN cache.
+ *
+ * @param dn
+ * A DN check for.
+ * @return {@code true} if the specified DN is in the temporary DN cache, or
+ * {@code false} if it is not.
+ */
+ @Override
+ public boolean contains(DN dn)
+ {
+ Cursor cursor = null;
+ DatabaseEntry key = new DatabaseEntry();
+ byte[] dnBytesForKey = dn.toIrreversibleNormalizedByteString().toByteArray();
+ key.setData(hashCode(dnBytesForKey));
+ try
+ {
+ cursor = dnCache.openCursor(null, CursorConfig.DEFAULT);
+ DatabaseEntry dns = new DatabaseEntry();
+ OperationStatus status = cursor.getSearchKey(key, dns, LockMode.DEFAULT);
+ byte[] dnBytesForValue = StaticUtils.getBytes(dn.toString());
+ return status == OperationStatus.SUCCESS && isDNMatched(dns.getData(), dnBytesForValue);
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
+ /**
+ * Return temporary environment stats.
+ *
+ * @param statsConfig
+ * A stats configuration instance.
+ * @return Environment stats.
+ * @throws DatabaseException
+ * If an error occurs retrieving the stats.
+ */
+ private EnvironmentStats getEnvironmentStats(StatsConfig statsConfig)
+ throws DatabaseException
+ {
+ return environment.getStats(statsConfig);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void diskLowThresholdReached(DiskSpaceMonitor monitor)
+ {
+ diskFullThresholdReached(monitor);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void diskFullThresholdReached(DiskSpaceMonitor monitor)
+ {
+ isCanceled = true;
+ Arg3<Object, Number, Number> argMsg = !isPhaseOneDone
+ ? ERR_IMPORT_LDIF_LACK_DISK_PHASE_ONE
+ : ERR_IMPORT_LDIF_LACK_DISK_PHASE_TWO;
+ logger.error(argMsg.get(monitor.getDirectory().getPath(), monitor.getFreeSpace(), monitor.getLowThreshold()));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void diskSpaceRestored(DiskSpaceMonitor monitor)
+ {
+ // Do nothing.
+ }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/Index.java b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/Index.java
index 74fdc95..6aafb58 100644
--- a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/Index.java
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/Index.java
@@ -38,7 +38,6 @@
import org.forgerock.opendj.ldap.ConditionResult;
import org.forgerock.opendj.ldap.spi.IndexingOptions;
import org.opends.server.backends.jeb.IndexBuffer.BufferedIndexValues;
-import org.opends.server.backends.jeb.importLDIF.ImportIDSet;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.Modification;
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/IndexInputBuffer.java b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/IndexInputBuffer.java
new file mode 100644
index 0000000..63db04c
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/IndexInputBuffer.java
@@ -0,0 +1,403 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2010 Sun Microsystems, Inc.
+ * Portions Copyright 2012-2015 ForgeRock AS.
+ */
+package org.opends.server.backends.jeb;
+
+import static org.opends.messages.JebMessages.ERR_JEB_IMPORT_BUFFER_IO_ERROR;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.opends.server.backends.jeb.Importer.IndexManager;
+
+import com.sleepycat.util.PackedInteger;
+
+/**
+ * The buffer class is used to process a buffer from the temporary index files
+ * during phase 2 processing.
+ */
+final class IndexInputBuffer implements Comparable<IndexInputBuffer>
+{
+
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ private final IndexManager indexMgr;
+ private final FileChannel channel;
+ private final long begin;
+ private final long end;
+ private final int id;
+
+ private long offset;
+ private final ByteBuffer cache;
+ private Integer indexID = null;
+ private ByteBuffer keyBuf = ByteBuffer.allocate(128);
+
+
+
+ /**
+ * Possible states while reading a record.
+ */
+ private enum RecordState
+ {
+ START, NEED_INSERT_ID_SET, NEED_DELETE_ID_SET
+ }
+
+
+
+ private RecordState recordState = RecordState.START;
+
+
+
+ /**
+ * Creates a new index input buffer.
+ *
+ * @param indexMgr
+ * The index manager.
+ * @param channel
+ * The index file channel.
+ * @param begin
+ * The position of the start of the buffer in the scratch file.
+ * @param end
+ * The position of the end of the buffer in the scratch file.
+ * @param id
+ * The index ID.
+ * @param cacheSize
+ * The cache size.
+ * @throws IOException
+ * If an IO error occurred when priming the cache.
+ */
+ public IndexInputBuffer(IndexManager indexMgr, FileChannel channel,
+ long begin, long end, int id, int cacheSize) throws IOException
+ {
+ this.indexMgr = indexMgr;
+ this.channel = channel;
+ this.begin = begin;
+ this.end = end;
+ this.offset = 0;
+ this.id = id;
+ this.cache = ByteBuffer.allocate(Math.max(cacheSize - 384, 256));
+
+ loadCache();
+ cache.flip();
+ keyBuf.flip();
+ }
+
+
+
+ private void loadCache() throws IOException
+ {
+ channel.position(begin + offset);
+ long leftToRead = end - (begin + offset);
+ long bytesToRead;
+ if (leftToRead < cache.remaining())
+ {
+ cache.limit((int) (cache.position() + leftToRead));
+ bytesToRead = (int) leftToRead;
+ }
+ else
+ {
+ bytesToRead = Math.min((end - offset), cache.remaining());
+ }
+ int bytesRead = 0;
+ while (bytesRead < bytesToRead)
+ {
+ bytesRead += channel.read(cache);
+ }
+ offset += bytesRead;
+ indexMgr.addBytesRead(bytesRead);
+ }
+
+
+
+ /**
+ * Returns {@code true} if this buffer has more data.
+ *
+ * @return {@code true} if this buffer has more data.
+ * @throws IOException
+ * If an IO error occurred.
+ */
+ public boolean hasMoreData() throws IOException
+ {
+ boolean ret = ((begin + offset) >= end);
+ return !(cache.remaining() == 0 && ret);
+ }
+
+
+
+ /**
+ * Returns the length of the next key.
+ *
+ * @return The length of the next key.
+ */
+ public int getKeyLen()
+ {
+ return keyBuf.limit();
+ }
+
+
+
+ /**
+ * Returns the next key.
+ *
+ * @param b
+ * A buffer into which the key should be added.
+ */
+ public void getKey(ByteBuffer b)
+ {
+ keyBuf.get(b.array(), 0, keyBuf.limit());
+ b.limit(keyBuf.limit());
+ }
+
+
+
+ /**
+ * Returns the index ID of the next record.
+ *
+ * @return The index ID of the next record.
+ */
+ public Integer getIndexID()
+ {
+ if (indexID == null)
+ {
+ try
+ {
+ getNextRecord();
+ }
+ catch (IOException ex)
+ {
+ logger.error(ERR_JEB_IMPORT_BUFFER_IO_ERROR, indexMgr
+ .getBufferFileName());
+ throw new RuntimeException(ex);
+ }
+ }
+ return indexID;
+ }
+
+
+
+ /**
+ * Reads the next record from the buffer, skipping any remaining data in the
+ * current record.
+ *
+ * @throws IOException
+ * If an IO error occurred.
+ */
+ public void getNextRecord() throws IOException
+ {
+ switch (recordState)
+ {
+ case START:
+ // Nothing to skip.
+ break;
+ case NEED_INSERT_ID_SET:
+ // The previous record's ID sets were not read, so skip them both.
+ mergeIDSet(null);
+ mergeIDSet(null);
+ break;
+ case NEED_DELETE_ID_SET:
+ // The previous record's delete ID set was not read, so skip it.
+ mergeIDSet(null);
+ break;
+ }
+
+ indexID = getInt();
+
+ ensureData(20);
+ byte[] ba = cache.array();
+ int p = cache.position();
+ int len = PackedInteger.getReadIntLength(ba, p);
+ int keyLen = PackedInteger.readInt(ba, p);
+ cache.position(p + len);
+ if (keyLen > keyBuf.capacity())
+ {
+ keyBuf = ByteBuffer.allocate(keyLen);
+ }
+ ensureData(keyLen);
+ keyBuf.clear();
+ cache.get(keyBuf.array(), 0, keyLen);
+ keyBuf.limit(keyLen);
+
+ recordState = RecordState.NEED_INSERT_ID_SET;
+ }
+
+
+
+ private int getInt() throws IOException
+ {
+ ensureData(4);
+ return cache.getInt();
+ }
+
+
+
+ /**
+ * Reads the next ID set from the record and merges it with the provided ID
+ * set.
+ *
+ * @param idSet
+ * The ID set to be merged.
+ * @throws IOException
+ * If an IO error occurred.
+ */
+ public void mergeIDSet(ImportIDSet idSet) throws IOException
+ {
+ if (recordState == RecordState.START)
+ {
+ throw new IllegalStateException();
+ }
+
+ ensureData(20);
+ int p = cache.position();
+ byte[] ba = cache.array();
+ int len = PackedInteger.getReadIntLength(ba, p);
+ int keyCount = PackedInteger.readInt(ba, p);
+ p += len;
+ cache.position(p);
+ for (int k = 0; k < keyCount; k++)
+ {
+ if (ensureData(9))
+ {
+ p = cache.position();
+ }
+ len = PackedInteger.getReadLongLength(ba, p);
+ long l = PackedInteger.readLong(ba, p);
+ p += len;
+ cache.position(p);
+
+ // idSet will be null if skipping.
+ if (idSet != null)
+ {
+ idSet.addEntryID(l);
+ }
+ }
+
+ switch (recordState)
+ {
+ case START:
+ throw new IllegalStateException();
+ case NEED_INSERT_ID_SET:
+ recordState = RecordState.NEED_DELETE_ID_SET;
+ break;
+ case NEED_DELETE_ID_SET:
+ recordState = RecordState.START;
+ break;
+ }
+ }
+
+
+
+ private boolean ensureData(int len) throws IOException
+ {
+ boolean ret = false;
+ if (cache.remaining() == 0)
+ {
+ cache.clear();
+ loadCache();
+ cache.flip();
+ ret = true;
+ }
+ else if (cache.remaining() < len)
+ {
+ cache.compact();
+ loadCache();
+ cache.flip();
+ ret = true;
+ }
+ return ret;
+ }
+
+
+
+ /**
+ * Compares this buffer with the provided key and index ID.
+ *
+ * @param cKey
+ * The key.
+ * @param cIndexID
+ * The index ID.
+ * @return A negative number if this buffer is less than the provided key and
+ * index ID, a positive number if this buffer is greater, or zero if
+ * it is the same.
+ */
+ int compare(ByteBuffer cKey, Integer cIndexID)
+ {
+ int returnCode, rc;
+ if (keyBuf.limit() == 0)
+ {
+ getIndexID();
+ }
+ rc = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
+ cKey.array(), cKey.limit());
+ if (rc != 0)
+ {
+ returnCode = 1;
+ }
+ else
+ {
+ returnCode = (indexID.intValue() == cIndexID.intValue()) ? 0 : 1;
+ }
+ return returnCode;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public int compareTo(IndexInputBuffer o)
+ {
+ // used in remove.
+ if (this == o)
+ {
+ return 0;
+ }
+
+ if (keyBuf.limit() == 0)
+ {
+ getIndexID();
+ }
+
+ if (o.keyBuf.limit() == 0)
+ {
+ o.getIndexID();
+ }
+
+ byte[] oKey = o.keyBuf.array();
+ int oLen = o.keyBuf.limit();
+ int returnCode = Importer.indexComparator.compare(keyBuf.array(), 0,
+ keyBuf.limit(), oKey, oLen);
+ if (returnCode == 0)
+ {
+ returnCode = indexID.intValue() - o.getIndexID().intValue();
+ if (returnCode == 0)
+ {
+ returnCode = id - o.id;
+ }
+ }
+ return returnCode;
+ }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/IndexOutputBuffer.java b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/IndexOutputBuffer.java
new file mode 100644
index 0000000..1a7bc3e
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/IndexOutputBuffer.java
@@ -0,0 +1,999 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2009-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2013-2015 ForgeRock AS.
+ */
+package org.opends.server.backends.jeb;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.sleepycat.util.PackedInteger;
+
+/**
+ * This class represents a index buffer used to store the keys and entry IDs
+ * processed from the LDIF file during phase one of an import, or rebuild index
+ * process. Each key and ID is stored in a record in the buffer.
+ * <p>
+ * The records in the buffer are eventually sorted, based on the key, when the
+ * maximum size of the buffer is reached and no more records will fit into the
+ * buffer. The buffer is scheduled to be flushed to an index scratch file and
+ * then re-cycled by the import, or rebuild-index process.
+ * </p>
+ * <p>
+ * The structure of a record in the buffer is the following:
+ *
+ * <pre>
+ * +-------------+-------------+---------+---------+------------+-----------+
+ * | record size | INS/DEL bit | indexID | entryID | key length | key bytes |
+ * +-------------+-------------+---------+---------+------------+-----------+
+ * </pre>
+ *
+ * The record size is used for fast seeks to quickly "jump" over records.
+ * </p>
+ * <p>
+ * The records are packed as much as possible, to optimize the buffer space.<br>
+ * This class is not thread safe.
+ * </p>
+ */
+final class IndexOutputBuffer implements Comparable<IndexOutputBuffer> {
+
+ /** Enumeration used when sorting a buffer. */
+ private enum CompareOp {
+ LT, GT, LE, GE, EQ
+ }
+
+ /** The size of a Java int. A Java int is 32 bits, i.e. 4 bytes. */
+ private static final int INT_SIZE = 4;
+
+ /**
+ * The record overhead. In addition to entryID, key length and key bytes, the
+ * record overhead includes the indexID + INS/DEL bit
+ */
+ private static final int REC_OVERHEAD = INT_SIZE + 1;
+
+ /** Buffer records are either insert records or delete records. */
+ private static final byte DEL = 0, INS = 1;
+
+ /** The size of a buffer. */
+ private final int size;
+ /** Byte array holding the actual buffer data. */
+ private final byte buffer[];
+
+ /**
+ * Used to break a tie (keys equal) when the buffers are being merged
+ * for writing to the index scratch file.
+ */
+ private long id;
+
+ /** Temporary buffer used to store integer values. */
+ private final byte[] intBytes = new byte[INT_SIZE];
+
+ /** OffSet where next key is written. */
+ private int keyOffset;
+ /** OffSet where next value record is written. */
+ private int recordOffset;
+ /** Amount of bytes left in the buffer. */
+ private int bytesLeft;
+ /** Number of keys in the buffer. */
+ private int keys;
+ /** Used to iterate over the buffer when writing to a scratch file. */
+ private int position;
+
+ /** The comparator to use sort the keys. */
+ private ComparatorBuffer<byte[]> comparator;
+
+ /**
+ * Used to make sure that an instance of this class is put on the
+ * correct scratch file writer work queue for processing.
+ */
+ private Importer.IndexKey indexKey;
+
+ /** Initial capacity of re-usable buffer used in key compares. */
+ private static final int CAP = 32;
+
+ /**
+ * This buffer is reused during key compares. It's main purpose is to keep
+ * memory footprint as small as possible.
+ */
+ private ByteBuffer keyBuffer = ByteBuffer.allocate(CAP);
+
+ /**
+ * Set to {@code true} if the buffer should not be recycled. Used when the
+ * importer/rebuild index process is doing phase one cleanup and flushing
+ * buffers not completed.
+ */
+ private boolean discarded;
+
+
+ /**
+ * Create an instance of a IndexBuffer using the specified size.
+ *
+ * @param size The size of the underlying byte array.
+ */
+ public IndexOutputBuffer(int size) {
+ this.size = size;
+ this.buffer = new byte[size];
+ this.bytesLeft = size;
+ this.recordOffset = size - 1;
+ }
+
+
+ /**
+ * Reset an IndexBuffer so it can be re-cycled.
+ */
+ public void reset() {
+ bytesLeft = size;
+ keyOffset = 0;
+ recordOffset = size - 1;
+ keys = 0;
+ position = 0;
+ comparator = null;
+ indexKey = null;
+ }
+
+ /**
+ * Creates a new poison buffer. Poison buffers are used to stop the processing of import tasks.
+ *
+ * @return a new poison buffer
+ */
+ public static IndexOutputBuffer poison()
+ {
+ return new IndexOutputBuffer(0);
+ }
+
+ /**
+ * Set the ID of a buffer to the specified value.
+ *
+ * @param id The value to set the ID to.
+ */
+ public void setID(long id)
+ {
+ this.id = id;
+ }
+
+
+ /**
+ * Return the ID of a buffer.
+ *
+ * @return The value of a buffer's ID.
+ */
+ private long getBufferID()
+ {
+ return this.id;
+ }
+
+
+ /**
+ * Determines if a buffer is a poison buffer. A poison buffer is used to
+ * shutdown work queues when import/rebuild index phase one is completed.
+ * A poison buffer has a 0 size.
+ *
+ * @return {@code true} if a buffer is a poison buffer, or {@code false}
+ * otherwise.
+ */
+ public boolean isPoison()
+ {
+ return size == 0;
+ }
+
+ /**
+ * Determines if buffer should be re-cycled by calling {@link #reset()}.
+ *
+ * @return {@code true} if buffer should be recycled, or {@code false} if it
+ * should not.
+ */
+ public boolean isDiscarded()
+ {
+ return discarded;
+ }
+
+ /**
+ * Sets the discarded flag to {@code true}.
+ */
+ public void discard()
+ {
+ discarded = true;
+ }
+
+ /**
+ * Returns {@code true} if there is enough space available to write the
+ * specified byte array in the buffer. It returns {@code false} otherwise.
+ *
+ * @param kBytes The byte array to check space against.
+ * @param id The id value to check space against.
+ * @return {@code true} if there is space to write the byte array in a
+ * buffer, or {@code false} otherwise.
+ */
+ public boolean isSpaceAvailable(byte[] kBytes, long id) {
+ return getRequiredSize(kBytes.length, id) < bytesLeft;
+ }
+
+ /**
+ * Set the comparator to be used in the buffer processing to the specified
+ * comparator.
+ *
+ * @param comparator The comparator to set the buffer's comparator to.
+ */
+ public void setComparator(ComparatorBuffer<byte[]> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+
+ /**
+ * Return a buffer's current position value.
+ *
+ * @return The buffer's current position value.
+ */
+ public int getPosition()
+ {
+ return position;
+ }
+
+
+ /**
+ * Set a buffer's position value to the specified position.
+ *
+ * @param position The value to set the position to.
+ */
+ public void setPosition(int position)
+ {
+ this.position = position;
+ }
+
+
+ /**
+ * Sort the buffer.
+ */
+ public void sort() {
+ sort(0, keys);
+ }
+
+
+ /**
+ * Add the specified key byte array and EntryID to the buffer.
+ *
+ * @param keyBytes The key byte array.
+ * @param entryID The EntryID.
+ * @param indexID The index ID the record belongs.
+ * @param insert <CODE>True</CODE> if key is an insert, false otherwise.
+ */
+ public void add(byte[] keyBytes, EntryID entryID, int indexID,
+ boolean insert) {
+ // write the record data, but leave the space to write the record size just
+ // before it
+ recordOffset = addRecord(keyBytes, entryID.longValue(), indexID, insert);
+ // then write the returned record size
+ System.arraycopy(getIntBytes(recordOffset), 0, buffer, keyOffset, INT_SIZE);
+ keyOffset += INT_SIZE;
+ bytesLeft = recordOffset - keyOffset;
+ keys++;
+ }
+
+
+ /**
+ * Writes the full record minus the record size itself.
+ */
+ private int addRecord(byte[]key, long id, int indexID, boolean insert)
+ {
+ int retOffset = recordOffset - getRecordSize(key.length, id);
+ int offSet = retOffset;
+
+ // write the INS/DEL bit
+ buffer[offSet++] = insert ? INS : DEL;
+ // write the indexID
+ System.arraycopy(getIntBytes(indexID), 0, buffer, offSet, INT_SIZE);
+ offSet += INT_SIZE;
+ // write the entryID
+ offSet = PackedInteger.writeLong(buffer, offSet, id);
+ // write the key length
+ offSet = PackedInteger.writeInt(buffer, offSet, key.length);
+ // write the key bytes
+ System.arraycopy(key, 0, buffer, offSet, key.length);
+ return retOffset;
+ }
+
+
+ /**
+ * Computes the full size of the record.
+ *
+ * @param keyLen The length of the key of index
+ * @param id The entry id
+ * @return The size that such record would take in the IndexOutputBuffer
+ */
+ public static int getRequiredSize(int keyLen, long id)
+ {
+ // Adds up the key length + key bytes + entryID + indexID + the INS/DEL bit
+ // and finally the space needed to store the record size
+ return getRecordSize(keyLen, id) + INT_SIZE;
+ }
+
+ private static int getRecordSize(int keyLen, long id)
+ {
+ // Adds up the key length + key bytes + ...
+ return PackedInteger.getWriteIntLength(keyLen) + keyLen +
+ // ... entryID + (indexID + INS/DEL bit).
+ PackedInteger.getWriteLongLength(id) + REC_OVERHEAD;
+ }
+
+
+ /**
+ * Write record at specified index to the specified output stream. Used when
+ * when writing the index scratch files.
+
+ * @param stream The stream to write the record at the index to.
+ * @param index The index of the record to write.
+ */
+ public void writeID(ByteArrayOutputStream stream, int index)
+ {
+ int offSet = getIntegerValue(index * INT_SIZE);
+ int len = PackedInteger.getReadLongLength(buffer, offSet + REC_OVERHEAD);
+ stream.write(buffer, offSet + REC_OVERHEAD, len);
+ }
+
+
+ /**
+ * Return {@code true} if the record specified by the index is an insert
+ * record, or {@code false} if it a delete record.
+ *
+ * @param index The index of the record.
+ *
+ * @return {@code true} if the record is an insert record, or {@code false}
+ * if it is a delete record.
+ */
+ public boolean isInsertRecord(int index)
+ {
+ int recOffset = getIntegerValue(index * INT_SIZE);
+ return buffer[recOffset] != DEL;
+ }
+
+
+ /**
+ * Return the size of the key part of the record.
+ *
+ * @return The size of the key part of the record.
+ */
+ public int getKeySize()
+ {
+ int offSet = getIntegerValue(position * INT_SIZE) + REC_OVERHEAD;
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ return PackedInteger.readInt(buffer, offSet);
+ }
+
+
+ /**
+ * Return the key value part of a record indicated by the current buffer
+ * position.
+ *
+ * @return byte array containing the key value.
+ */
+ public byte[] getKey()
+ {
+ return getKey(position);
+ }
+
+ /** Used to minimized memory usage when comparing keys. */
+ private ByteBuffer getKeyBuf(int x)
+ {
+ keyBuffer.clear();
+ int offSet = getIntegerValue(x * INT_SIZE) + REC_OVERHEAD;
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ int keyLen = PackedInteger.readInt(buffer, offSet);
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ //Re-allocate if the key is bigger than the capacity.
+ if(keyLen > keyBuffer.capacity())
+ {
+ keyBuffer = ByteBuffer.allocate(keyLen);
+ }
+ keyBuffer.put(buffer, offSet, keyLen);
+ keyBuffer.flip();
+ return keyBuffer;
+ }
+
+
+ /**
+ * Return the key value part of a record specified by the index.
+ *
+ * @param x index to return.
+ * @return byte array containing the key value.
+ */
+ private byte[] getKey(int x)
+ {
+ int offSet = getIntegerValue(x * INT_SIZE) + REC_OVERHEAD;
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ int keyLen = PackedInteger.readInt(buffer, offSet);
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ byte[] key = new byte[keyLen];
+ System.arraycopy(buffer, offSet, key, 0, keyLen);
+ return key;
+ }
+
+
+ private int getIndexID(int x)
+ {
+ return getIntegerValue(getIntegerValue(x * INT_SIZE) + 1);
+ }
+
+
+ /**
+ * Return index id associated with the current position's record.
+ *
+ * @return The index id.
+ */
+ public int getIndexID()
+ {
+ return getIntegerValue(getIntegerValue(position * INT_SIZE) + 1);
+ }
+
+
+ private boolean is(int x, int y, CompareOp op)
+ {
+ int xoffSet = getIntegerValue(x * INT_SIZE);
+ int xIndexID = getIntegerValue(xoffSet + 1);
+ xoffSet += REC_OVERHEAD;
+ xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet);
+ int xKeyLen = PackedInteger.readInt(buffer, xoffSet);
+ int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet;
+ int yoffSet = getIntegerValue(y * INT_SIZE);
+ int yIndexID = getIntegerValue(yoffSet + 1);
+ yoffSet += REC_OVERHEAD;
+ yoffSet += PackedInteger.getReadIntLength(buffer, yoffSet);
+ int yKeyLen = PackedInteger.readInt(buffer, yoffSet);
+ int yKey = PackedInteger.getReadIntLength(buffer, yoffSet) + yoffSet;
+ return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen,
+ xIndexID, yKey, yKeyLen, yIndexID), op);
+ }
+
+
+ private boolean is(int x, byte[] yKey, CompareOp op, int yIndexID)
+ {
+ int xoffSet = getIntegerValue(x * INT_SIZE);
+ int xIndexID = getIntegerValue(xoffSet + 1);
+ xoffSet += REC_OVERHEAD;
+ xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet);
+ int xKeyLen = PackedInteger.readInt(buffer, xoffSet);
+ int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet;
+ return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen,
+ xIndexID, yKey, yKey.length, yIndexID), op);
+ }
+
+
+ /**
+ * Compare the byte array at the current position with the specified one and
+ * using the specified index id. It will return {@code true} if the byte
+ * array at the current position is equal to the specified byte array as
+ * determined by the comparator and the index ID is is equal. It will
+ * return {@code false} otherwise.
+ *
+ * @param b The byte array to compare.
+ * @param bIndexID The index key.
+ * @return <CODE>True</CODE> if the byte arrays are equal.
+ */
+ public boolean compare(byte[]b, int bIndexID)
+ {
+ int offset = getIntegerValue(position * INT_SIZE);
+ int indexID = getIntegerValue(offset + 1);
+ offset += REC_OVERHEAD;
+ offset += PackedInteger.getReadIntLength(buffer, offset);
+ int keyLen = PackedInteger.readInt(buffer, offset);
+ int key = PackedInteger.getReadIntLength(buffer, offset) + offset;
+ return comparator.compare(buffer, key, keyLen, b, b.length) == 0
+ && indexID == bIndexID;
+ }
+
+
+ /**
+ * Compare current IndexBuffer to the specified index buffer using both the
+ * comparator and index ID of both buffers.
+ *
+ * The key at the value of position in both buffers are used in the compare.
+ *
+ * @param b The IndexBuffer to compare to.
+ * @return 0 if the buffers are equal, -1 if the current buffer is less
+ * than the specified buffer, or 1 if it is greater.
+ */
+ @Override
+ public int compareTo(IndexOutputBuffer b)
+ {
+ final ByteBuffer keyBuf = b.getKeyBuf(b.position);
+ int offset = getIntegerValue(position * INT_SIZE);
+ int indexID = getIntegerValue(offset + 1);
+ offset += REC_OVERHEAD;
+ offset += PackedInteger.getReadIntLength(buffer, offset);
+ int keyLen = PackedInteger.readInt(buffer, offset);
+ int key = PackedInteger.getReadIntLength(buffer, offset) + offset;
+
+ final int cmp = comparator.compare(buffer, key, keyLen, keyBuf.array(), keyBuf.limit());
+ if (cmp != 0)
+ {
+ return cmp;
+ }
+
+ final int bIndexID = b.getIndexID();
+ if (indexID == bIndexID)
+ {
+ // This is tested in a tree set remove when a buffer is removed from the tree set.
+ return compare(this.id, b.getBufferID());
+ }
+ else if (indexID < bIndexID)
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+
+ private int compare(long l1, long l2)
+ {
+ if (l1 == l2)
+ {
+ return 0;
+ }
+ else if (l1 < l2)
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+
+
+ /**
+ * Write a record to specified output stream using the record pointed to by
+ * the current position and the specified byte stream of ids.
+ *
+ * @param dataStream The data output stream to write to.
+ *
+ * @throws IOException If an I/O error occurs writing the record.
+ */
+ public void writeKey(DataOutputStream dataStream) throws IOException
+ {
+ int offSet = getIntegerValue(position * INT_SIZE) + REC_OVERHEAD;
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ int keyLen = PackedInteger.readInt(buffer, offSet);
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ dataStream.write(buffer, offSet, keyLen);
+ }
+
+ /**
+ * Compare the byte array at the current position with the byte array at the
+ * specified index.
+ *
+ * @param i The index pointing to the byte array to compare.
+ * @return {@code true} if the byte arrays are equal, or {@code false} otherwise
+ */
+ public boolean compare(int i)
+ {
+ return is(i, position, CompareOp.EQ);
+ }
+
+ /**
+ * Return the current number of keys.
+ *
+ * @return The number of keys currently in an index buffer.
+ */
+ public int getNumberKeys()
+ {
+ return keys;
+ }
+
+ /**
+ * Return {@code true} if the buffer has more data to process, or
+ * {@code false} otherwise. Used when iterating over the buffer writing the
+ * scratch index file.
+ *
+ * @return {@code true} if a buffer has more data to process, or
+ * {@code false} otherwise.
+ */
+ public boolean hasMoreData()
+ {
+ return position + 1 < keys;
+ }
+
+ /**
+ * Advance the position pointer to the next record in the buffer. Used when
+ * iterating over the buffer examining keys.
+ */
+ public void nextRecord()
+ {
+ position++;
+ }
+
+ private byte[] getIntBytes(int val)
+ {
+ for (int i = 3; i >= 0; i--) {
+ intBytes[i] = (byte) (val & 0xff);
+ val >>>= 8;
+ }
+ return intBytes;
+ }
+
+
+ private int getIntegerValue(int index)
+ {
+ int answer = 0;
+ for (int i = 0; i < INT_SIZE; i++) {
+ byte b = buffer[index + i];
+ answer <<= 8;
+ answer |= (b & 0xff);
+ }
+ return answer;
+ }
+
+
+ private int med3(int a, int b, int c)
+ {
+ return is(a, b, CompareOp.LT) ?
+ (is(b,c,CompareOp.LT) ? b : is(a,c,CompareOp.LT) ? c : a) :
+ (is(b,c,CompareOp.GT) ? b : is(a,c,CompareOp.GT) ? c : a);
+ }
+
+
+ private void sort(int off, int len)
+ {
+ if (len < 7) {
+ for (int i=off; i<len+off; i++)
+ {
+ for (int j=i; j>off && is(j-1, j, CompareOp.GT); j--)
+ {
+ swap(j, j-1);
+ }
+ }
+ return;
+ }
+
+ int m = off + (len >> 1);
+ if (len > 7) {
+ int l = off;
+ int n = off + len - 1;
+ if (len > 40) {
+ int s = len/8;
+ l = med3(l, l+s, l+2*s);
+ m = med3(m-s, m, m+s);
+ n = med3(n-2*s, n-s, n);
+ }
+ m = med3(l, m, n);
+ }
+
+ byte[] mKey = getKey(m);
+ int mIndexID = getIndexID(m);
+
+ int a = off, b = a, c = off + len - 1, d = c;
+ while(true)
+ {
+ while (b <= c && is(b, mKey, CompareOp.LE, mIndexID))
+ {
+ if (is(b, mKey, CompareOp.EQ, mIndexID))
+ {
+ swap(a++, b);
+ }
+ b++;
+ }
+ while (c >= b && is(c, mKey, CompareOp.GE, mIndexID))
+ {
+ if (is(c, mKey, CompareOp.EQ, mIndexID))
+ {
+ swap(c, d--);
+ }
+ c--;
+ }
+ if (b > c)
+ {
+ break;
+ }
+ swap(b++, c--);
+ }
+
+ // Swap partition elements back to middle
+ int s, n = off + len;
+ s = Math.min(a-off, b-a );
+ vectorSwap(off, b-s, s);
+ s = Math.min(d-c, n-d-1);
+ vectorSwap(b, n-s, s);
+
+ s = b - a;
+ // Recursively sort non-partition-elements
+ if (s > 1)
+ {
+ sort(off, s);
+ }
+ s = d - c;
+ if (s > 1)
+ {
+ sort(n-s, s);
+ }
+ }
+
+
+ private void swap(int a, int b)
+ {
+ int aOffset = a * INT_SIZE;
+ int bOffset = b * INT_SIZE;
+ int bVal = getIntegerValue(bOffset);
+ System.arraycopy(buffer, aOffset, buffer, bOffset, INT_SIZE);
+ System.arraycopy(getIntBytes(bVal), 0, buffer, aOffset, INT_SIZE);
+ }
+
+
+ private void vectorSwap(int a, int b, int n)
+ {
+ for (int i=0; i<n; i++, a++, b++)
+ {
+ swap(a, b);
+ }
+ }
+
+
+ private boolean evaluateReturnCode(int rc, CompareOp op)
+ {
+ switch(op) {
+ case LT:
+ return rc < 0;
+ case GT:
+ return rc > 0;
+ case LE:
+ return rc <= 0;
+ case GE:
+ return rc >= 0;
+ case EQ:
+ return rc == 0;
+ default:
+ return false;
+ }
+ }
+
+
+ /**
+ * Interface that defines two methods used to compare keys used in this
+ * class. The Comparator interface cannot be used in this class, so this
+ * special one is used that knows about the special properties of this class.
+ *
+ * @param <T> object to use in the compare
+ */
+ public static interface ComparatorBuffer<T> {
+
+
+ /**
+ * Compare two offsets in an object, usually a byte array.
+ *
+ * @param o The object.
+ * @param offset The first offset.
+ * @param length The first length.
+ * @param indexID The first index id.
+ * @param otherOffset The second offset.
+ * @param otherLength The second length.
+ * @param otherIndexID The second index id.
+ * @return a negative integer, zero, or a positive integer as the first
+ * offset value is less than, equal to, or greater than the second.
+ */
+ int compare(T o, int offset, int length, int indexID, int otherOffset,
+ int otherLength, int otherIndexID);
+
+
+ /**
+ * Compare an offset in an object with the specified object.
+ *
+ * @param o The first object.
+ * @param offset The first offset.
+ * @param length The first length.
+ * @param indexID The first index id.
+ * @param other The second object.
+ * @param otherLength The length of the second object.
+ * @param otherIndexID The second index id.
+ * @return a negative integer, zero, or a positive integer as the first
+ * offset value is less than, equal to, or greater than the second
+ * object.
+ */
+ int compare(T o, int offset, int length, int indexID, T other,
+ int otherLength, int otherIndexID);
+
+
+ /**
+ * Compare an offset in an object with the specified object.
+ *
+ * @param o The first object.
+ * @param offset The first offset.
+ * @param length The first length.
+ * @param other The second object.
+ * @param otherLen The length of the second object.
+ * @return a negative integer, zero, or a positive integer as the first
+ * offset value is less than, equal to, or greater than the second
+ * object.
+ */
+ int compare(T o, int offset, int length, T other,
+ int otherLen);
+
+ }
+
+
+ /**
+ * Implementation of ComparatorBuffer interface. Used to compare keys when
+ * they are non-DN indexes.
+ */
+ public static class IndexComparator implements IndexOutputBuffer.ComparatorBuffer<byte[]>
+ {
+
+ /**
+ * Compare two offsets in an byte array using the index compare
+ * algorithm. The specified index ID is used in the comparison if the
+ * byte arrays are equal.
+ *
+ * @param b The byte array.
+ * @param offset The first offset.
+ * @param length The first length.
+ * @param indexID The first index id.
+ * @param otherOffset The second offset.
+ * @param otherLength The second length.
+ * @param otherIndexID The second index id.
+ * @return a negative integer, zero, or a positive integer as the first
+ * offset value is less than, equal to, or greater than the second.
+ */
+ @Override
+ public int compare(byte[] b, int offset, int length, int indexID,
+ int otherOffset, int otherLength, int otherIndexID)
+ {
+ for(int i = 0; i < length && i < otherLength; i++)
+ {
+ if(b[offset + i] > b[otherOffset + i])
+ {
+ return 1;
+ }
+ else if (b[offset + i] < b[otherOffset + i])
+ {
+ return -1;
+ }
+ }
+ return compareLengthThenIndexID(length, indexID, otherLength, otherIndexID);
+ }
+
+ /**
+ * Compare an offset in an byte array with the specified byte array,
+ * using the DN compare algorithm. The specified index ID is used in the
+ * comparison if the byte arrays are equal.
+ *
+ * @param b The byte array.
+ * @param offset The first offset.
+ * @param length The first length.
+ * @param indexID The first index id.
+ * @param other The second byte array to compare to.
+ * @param otherLength The second byte array's length.
+ * @param otherIndexID The second index id.
+ * @return a negative integer, zero, or a positive integer as the first
+ * offset value is less than, equal to, or greater than the second
+ * byte array.
+ */
+ @Override
+ public int compare(byte[] b, int offset, int length, int indexID,
+ byte[] other, int otherLength, int otherIndexID)
+ {
+ for(int i = 0; i < length && i < otherLength; i++)
+ {
+ if(b[offset + i] > other[i])
+ {
+ return 1;
+ }
+ else if (b[offset + i] < other[i])
+ {
+ return -1;
+ }
+ }
+ return compareLengthThenIndexID(length, indexID, otherLength, otherIndexID);
+ }
+
+ /**
+ * The arrays are equal, make sure they are in the same index
+ * since multiple suffixes might have the same key.
+ */
+ private int compareLengthThenIndexID(int length, int indexID, int otherLength, int otherIndexID)
+ {
+ if (length == otherLength)
+ {
+ return compare(indexID, otherIndexID);
+ }
+ else if (length > otherLength)
+ {
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+
+ /**
+ * Compare an offset in an byte array with the specified byte array,
+ * using the DN compare algorithm.
+ *
+ * @param b The byte array.
+ * @param offset The first offset.
+ * @param length The first length.
+ * @param other The second byte array to compare to.
+ * @param otherLength The second byte array's length.
+ * @return a negative integer, zero, or a positive integer as the first
+ * offset value is less than, equal to, or greater than the second
+ * byte array.
+ */
+ @Override
+ public int compare(byte[] b, int offset, int length, byte[] other,
+ int otherLength)
+ {
+ for(int i = 0; i < length && i < otherLength; i++)
+ {
+ if(b[offset + i] > other[i])
+ {
+ return 1;
+ }
+ else if (b[offset + i] < other[i])
+ {
+ return -1;
+ }
+ }
+ return compare(length, otherLength);
+ }
+
+ private int compare(int i1, int i2)
+ {
+ if (i1 == i2)
+ {
+ return 0;
+ }
+ else if (i1 > i2)
+ {
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+ }
+
+
+ /**
+ * Set the index key associated with an index buffer.
+ *
+ * @param indexKey The index key.
+ */
+ public void setIndexKey(Importer.IndexKey indexKey)
+ {
+ this.indexKey = indexKey;
+ }
+
+
+ /**
+ * Return the index key of an index buffer.
+ * @return The index buffer's index key.
+ */
+ public Importer.IndexKey getIndexKey()
+ {
+ return indexKey;
+ }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/NullIndex.java b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/NullIndex.java
index 9611709..76ff705 100644
--- a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/NullIndex.java
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/NullIndex.java
@@ -29,7 +29,6 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ConditionResult;
import org.forgerock.opendj.ldap.spi.IndexingOptions;
-import org.opends.server.backends.jeb.importLDIF.ImportIDSet;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.Modification;
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/Suffix.java b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/Suffix.java
new file mode 100644
index 0000000..260dfba
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/Suffix.java
@@ -0,0 +1,369 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2009-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2014-2015 ForgeRock AS
+ */
+package org.opends.server.backends.jeb;
+
+import static org.opends.messages.JebMessages.*;
+import static org.opends.server.util.ServerConstants.*;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.backends.jeb.Importer.DNCache;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.DN;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.LockMode;
+
+/**
+ * The class represents a suffix that is to be loaded during an import, or
+ * rebuild index process. Multiple instances of this class can be instantiated
+ * during and import to support multiple suffixes in a backend. A rebuild
+ * index has only one of these instances.
+ */
+class Suffix
+{
+
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ private final List<DN> includeBranches, excludeBranches;
+ private final DN baseDN;
+ private final EntryContainer srcEntryContainer;
+ private final EntryContainer entryContainer;
+ private final Object synchObject = new Object();
+ private static final int PARENT_ID_SET_SIZE = 16 * 1024;
+ private final ConcurrentHashMap<DN, CountDownLatch> pendingMap =
+ new ConcurrentHashMap<DN, CountDownLatch>();
+ private final Set<DN> parentSet = new HashSet<DN>(PARENT_ID_SET_SIZE);
+ private DN parentDN;
+
+ /**
+ * Creates a suffix instance using the specified parameters.
+ *
+ * @param entryContainer The entry container pertaining to the suffix.
+ * @param srcEntryContainer The original entry container.
+ * @param includeBranches The include branches.
+ * @param excludeBranches The exclude branches.
+ */
+ Suffix(EntryContainer entryContainer, EntryContainer srcEntryContainer,
+ List<DN> includeBranches, List<DN> excludeBranches)
+ {
+ this.entryContainer = entryContainer;
+ this.srcEntryContainer = srcEntryContainer;
+ this.baseDN = entryContainer.getBaseDN();
+ if (includeBranches != null)
+ {
+ this.includeBranches = includeBranches;
+ }
+ else
+ {
+ this.includeBranches = new ArrayList<DN>(0);
+ }
+ if (excludeBranches != null)
+ {
+ this.excludeBranches = excludeBranches;
+ }
+ else
+ {
+ this.excludeBranches = new ArrayList<DN>(0);
+ }
+ }
+
+ /**
+ * Returns the DN2ID instance pertaining to a suffix instance.
+ *
+ * @return A DN2ID instance that can be used to manipulate the DN2ID database.
+ */
+ public DN2ID getDN2ID()
+ {
+ return entryContainer.getDN2ID();
+ }
+
+
+ /**
+ * Returns the ID2Entry instance pertaining to a suffix instance.
+ *
+ * @return A ID2Entry instance that can be used to manipulate the ID2Entry
+ * database.
+ */
+ public ID2Entry getID2Entry()
+ {
+ return entryContainer.getID2Entry();
+ }
+
+
+ /**
+ * Returns the DN2URI instance pertaining to a suffix instance.
+ *
+ * @return A DN2URI instance that can be used to manipulate the DN2URI
+ * database.
+ */
+ public DN2URI getDN2URI()
+ {
+ return entryContainer.getDN2URI();
+ }
+
+
+ /**
+ * Returns the entry container pertaining to a suffix instance.
+ *
+ * @return The entry container used to create a suffix instance.
+ */
+ public EntryContainer getEntryContainer()
+ {
+ return entryContainer;
+ }
+
+
+ /**
+ * Return the Attribute Type - Index map used to map an attribute type to an
+ * index instance.
+ *
+ * @return A suffixes Attribute Type - Index map.
+ */
+ public Map<AttributeType, AttributeIndex> getAttrIndexMap()
+ {
+ return entryContainer.getAttributeIndexMap();
+ }
+
+
+ /**
+ * Make sure the specified parent DN is not in the pending map.
+ *
+ * @param parentDN The DN of the parent.
+ */
+ private void assureNotPending(DN parentDN) throws InterruptedException
+ {
+ final CountDownLatch l = pendingMap.get(parentDN);
+ if (l != null)
+ {
+ l.await();
+ }
+ }
+
+
+ /**
+ * Add specified DN to the pending map.
+ *
+ * @param dn The DN to add to the map.
+ */
+ public void addPending(DN dn)
+ {
+ pendingMap.putIfAbsent(dn, new CountDownLatch(1));
+ }
+
+
+ /**
+ * Remove the specified DN from the pending map, it may not exist if the
+ * entries are being migrated so just return.
+ *
+ * @param dn The DN to remove from the map.
+ */
+ public void removePending(DN dn)
+ {
+ CountDownLatch l = pendingMap.remove(dn);
+ if(l != null)
+ {
+ l.countDown();
+ }
+ }
+
+
+ /**
+ * Return {@code true} if the specified dn is contained in the parent set, or
+ * in the specified DN cache. This would indicate that the parent has already
+ * been processed. It returns {@code false} otherwise.
+ *
+ * It will optionally check the dn2id database for the dn if the specified
+ * cleared backend boolean is {@code true}.
+ *
+ * @param dn The DN to check for.
+ * @param dnCache The importer DN cache.
+ * @param clearedBackend Set to {@code true} if the import process cleared the
+ * backend before processing.
+ * @return {@code true} if the dn is contained in the parent ID, or
+ * {@code false} otherwise.
+ *
+ * @throws DatabaseException If an error occurred searching the DN cache, or
+ * dn2id database.
+ * @throws InterruptedException If an error occurred processing the pending
+ * map.
+ */
+ public
+ boolean isParentProcessed(DN dn, DNCache dnCache, boolean clearedBackend)
+ throws DatabaseException, InterruptedException {
+ synchronized(synchObject) {
+ if(parentSet.contains(dn))
+ {
+ return true;
+ }
+ }
+ //The DN was not in the parent set. Make sure it isn't pending.
+ try {
+ assureNotPending(dn);
+ } catch (InterruptedException e) {
+ logger.error(ERR_JEB_IMPORT_LDIF_PENDING_ERR, e.getMessage());
+ throw e;
+ }
+ // Either parent is in the DN cache,
+ // or else check the dn2id database for the DN (only if backend wasn't cleared)
+ final boolean parentThere = dnCache.contains(dn)
+ || (!clearedBackend
+ && getDN2ID().get(null, dn, LockMode.DEFAULT) != null);
+ //Add the DN to the parent set if needed.
+ if (parentThere) {
+ synchronized(synchObject) {
+ if (parentSet.size() >= PARENT_ID_SET_SIZE) {
+ Iterator<DN> iterator = parentSet.iterator();
+ iterator.next();
+ iterator.remove();
+ }
+ parentSet.add(dn);
+ }
+ }
+ return parentThere;
+ }
+
+
+ /**
+ * Sets the trusted status of all of the indexes, vlvIndexes, id2children
+ * and id2subtree indexes.
+ *
+ * @param trusted True if the indexes should be trusted or false
+ * otherwise.
+ *
+ * @throws DatabaseException If an error occurred setting the indexes to
+ * trusted.
+ */
+ public void setIndexesTrusted(boolean trusted) throws DatabaseException
+ {
+ entryContainer.getID2Children().setTrusted(null,trusted);
+ entryContainer.getID2Subtree().setTrusted(null, trusted);
+ for (AttributeIndex attributeIndex : entryContainer.getAttributeIndexes())
+ {
+ setTrusted(attributeIndex.getEqualityIndex(), trusted);
+ setTrusted(attributeIndex.getPresenceIndex(), trusted);
+ setTrusted(attributeIndex.getSubstringIndex(), trusted);
+ setTrusted(attributeIndex.getOrderingIndex(), trusted);
+ setTrusted(attributeIndex.getApproximateIndex(), trusted);
+ Map<String, Collection<Index>> exIndexes = attributeIndex.getExtensibleIndexes();
+ if(!exIndexes.isEmpty())
+ {
+ setTrusted(exIndexes.get(EXTENSIBLE_INDEXER_ID_SUBSTRING), trusted);
+ setTrusted(exIndexes.get(EXTENSIBLE_INDEXER_ID_SHARED), trusted);
+ }
+ }
+ for(VLVIndex vlvIdx : entryContainer.getVLVIndexes()) {
+ vlvIdx.setTrusted(null, trusted);
+ }
+ }
+
+ private void setTrusted(Index index, boolean trusted)
+ {
+ if (index != null)
+ {
+ index.setTrusted(null, trusted);
+ }
+ }
+
+ private void setTrusted(Collection<Index> subIndexes, boolean trusted)
+ {
+ if (subIndexes != null)
+ {
+ for (Index subIndex : subIndexes)
+ {
+ subIndex.setTrusted(null, trusted);
+ }
+ }
+ }
+
+ /**
+ * Get the parent DN of the last entry added to a suffix.
+ *
+ * @return The parent DN of the last entry added.
+ */
+ public DN getParentDN()
+ {
+ return parentDN;
+ }
+
+
+ /**
+ * Set the parent DN of the last entry added to a suffix.
+ *
+ * @param parentDN The parent DN to save.
+ */
+ public void setParentDN(DN parentDN)
+ {
+ this.parentDN = parentDN;
+ }
+
+ /**
+ * Return a src entry container.
+ *
+ * @return The src entry container.
+ */
+ public EntryContainer getSrcEntryContainer()
+ {
+ return this.srcEntryContainer;
+ }
+
+
+ /**
+ * Return include branches.
+ *
+ * @return The include branches.
+ */
+ public List<DN> getIncludeBranches()
+ {
+ return this.includeBranches;
+ }
+
+
+ /**
+ * Return exclude branches.
+ *
+ * @return the exclude branches.
+ */
+ public List<DN> getExcludeBranches()
+ {
+ return this.excludeBranches;
+ }
+
+
+ /**
+ * Return base DN.
+ *
+ * @return The base DN.
+ */
+ public DN getBaseDN()
+ {
+ return this.baseDN;
+ }
+}
--
Gitblit v1.10.0