From c7f2a12383bcdc62e8b449bd2844552d70e0ecf6 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 03 Feb 2015 17:00:51 +0000
Subject: [PATCH] Remove org.opends.server.backends.jeb.importLDIF package
---
/dev/null | 35 -----------------------------------
1 files changed, 0 insertions(+), 35 deletions(-)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
deleted file mode 100644
index 2186294..0000000
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
+++ /dev/null
@@ -1,480 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
- */
-package org.opends.server.backends.jeb.importLDIF;
-
-import java.nio.ByteBuffer;
-
-import org.opends.server.backends.jeb.EntryID;
-import org.opends.server.backends.jeb.JebFormat;
-
-/**
- * This class manages the set of ID that are to be eventually added to an index
- * database. It is responsible for determining if the number of IDs is above
- * the configured ID limit. If the limit it reached, the class stops tracking
- * individual IDs and marks the set as undefined. This class is not thread safe.
- */
-public class ImportIDSet {
-
- /** The internal array where elements are stored. */
- private long[] array;
- /** The number of valid elements in the array. */
- private int count;
- /** Boolean to keep track if the instance is defined or not. */
- private boolean isDefined = true;
- /** Size of the undefined if count is kept. */
- private long undefinedSize;
- /** Key related to an ID set. */
- private ByteBuffer key;
- /** The entry limit size. */
- private final int limit;
- /** Set to true if a count of ids above the entry limit should be kept. */
- private final boolean doCount;
-
- /**
- * Create an import ID set of the specified size, index limit and index
- * maintain count, plus an extra 128 slots.
- *
- * @param size The size of the the underlying array, plus some extra space.
- * @param limit The index entry limit.
- * @param doCount The index maintain count.
- */
- public ImportIDSet(int size, int limit, boolean doCount)
- {
- this.array = new long[size + 128];
- // A limit of 0 means unlimited.
- this.limit = limit == 0 ? Integer.MAX_VALUE : limit;
- this.doCount = doCount;
- }
-
- /** Create an empty import instance. */
- public ImportIDSet()
- {
- this.limit = -1;
- this.doCount = false;
- }
-
- /**
- * Clear the set so it can be reused again. The boolean indexParam specifies
- * if the index parameters should be cleared also.
- */
- public void clear()
- {
- undefinedSize = 0;
- isDefined = true;
- count = 0;
- }
-
- /**
- * Return if an import ID set is defined or not.
- *
- * @return <CODE>True</CODE> if an import ID set is defined.
- */
- public boolean isDefined()
- {
- return isDefined;
- }
-
- /** Set an import ID set to undefined. */
- private void setUndefined() {
- array = null;
- isDefined = false;
- }
-
- /**
- * Add the specified entry id to an import ID set.
- *
- * @param entryID The entry ID to add to an import ID set.
- */
- void addEntryID(EntryID entryID) {
- addEntryID(entryID.longValue());
- }
-
- /**
- * Add the specified long value to an import ID set.
- *
- * @param l The long value to add to an import ID set.
- */
- void addEntryID(long l) {
- if(!isDefined()) {
- if(doCount) {
- undefinedSize++;
- }
- return;
- }
- if (l < 0 || (isDefined() && count + 1 > limit))
- {
- setUndefined();
- if(doCount) {
- undefinedSize = count + 1;
- } else {
- undefinedSize = Long.MAX_VALUE;
- }
- count = 0;
- } else {
- add(l);
- }
- }
-
- private boolean mergeCount(byte[] dBbytes, ImportIDSet importIdSet) {
- boolean incrementLimitCount=false;
- boolean dbUndefined = isDBUndefined(dBbytes);
-
- if (dbUndefined && !importIdSet.isDefined()) {
- undefinedSize = JebFormat.entryIDUndefinedSizeFromDatabase(dBbytes) +
- importIdSet.undefinedSize;
- isDefined=false;
- } else if (dbUndefined && importIdSet.isDefined()) {
- undefinedSize = JebFormat.entryIDUndefinedSizeFromDatabase(dBbytes) +
- importIdSet.size();
- isDefined=false;
- } else if(!importIdSet.isDefined()) {
- int dbSize = JebFormat.entryIDListFromDatabase(dBbytes).length;
- undefinedSize = dbSize + importIdSet.undefinedSize;
- isDefined = false;
- incrementLimitCount = true;
- } else {
- array = JebFormat.entryIDListFromDatabase(dBbytes);
- if(array.length + importIdSet.size() > limit) {
- undefinedSize = array.length + importIdSet.size();
- isDefined=false;
- incrementLimitCount=true;
- } else {
- count = array.length;
- addAll(importIdSet);
- }
- }
- return incrementLimitCount;
- }
-
- /**
- * Remove the specified import ID set from the byte array read from the DB.
- *
- * @param bytes The byte array read from JEB.
- * @param importIdSet The import ID set to delete.
- */
- public void remove(byte[] bytes, ImportIDSet importIdSet)
- {
- if (isDBUndefined(bytes)) {
- isDefined=false;
- importIdSet.setUndefined();
- undefinedSize = Long.MAX_VALUE;
- } else if(!importIdSet.isDefined()) {
- isDefined=false;
- undefinedSize = Long.MAX_VALUE;
- } else {
- array = JebFormat.entryIDListFromDatabase(bytes);
- if(array.length - importIdSet.size() > limit) {
- isDefined=false;
- count = 0;
- importIdSet.setUndefined();
- undefinedSize = Long.MAX_VALUE;
- } else {
- count = array.length;
- removeAll(importIdSet);
- }
- }
- }
-
- /**
- * Merge the specified byte array read from a DB, with the specified import
- * ID set. The specified limit and maintain count parameters define
- * if the newly merged set is defined or not.
- *
- * @param bytes The byte array of IDs read from a DB.
- * @param importIdSet The import ID set to merge the byte array with.
- * @return <CODE>True</CODE> if the import ID set started keeping a count as
- * a result of the merge.
- */
- public boolean merge(byte[] bytes, ImportIDSet importIdSet)
- {
- boolean incrementLimitCount=false;
- if(doCount) {
- incrementLimitCount = mergeCount(bytes, importIdSet);
- } else if (isDBUndefined(bytes)) {
- isDefined = false;
- importIdSet.setUndefined();
- undefinedSize = Long.MAX_VALUE;
- count = 0;
- } else if(!importIdSet.isDefined()) {
- isDefined = false;
- incrementLimitCount = true;
- undefinedSize = Long.MAX_VALUE;
- count = 0;
- } else {
- array = JebFormat.entryIDListFromDatabase(bytes);
- if (array.length + importIdSet.size() > limit) {
- isDefined = false;
- incrementLimitCount = true;
- count = 0;
- importIdSet.setUndefined();
- undefinedSize = Long.MAX_VALUE;
- } else {
- count = array.length;
- addAll(importIdSet);
- }
- }
- return incrementLimitCount;
- }
-
- private boolean isDBUndefined(byte[] bytes)
- {
- return (bytes[0] & 0x80) == 0x80;
- }
-
- private void removeAll(ImportIDSet that) {
- long[] newArray = new long[array.length];
- int c = 0;
- for(int i=0; i < count; i++)
- {
- if(binarySearch(that.array, that.count, array[i]) < 0)
- {
- newArray[c++] = array[i];
- }
- }
- array = newArray;
- count = c;
- }
-
- private void addAll(ImportIDSet that) {
- resize(this.count+that.count);
-
- if (that.count == 0)
- {
- return;
- }
-
- // Optimize for the case where the two sets are sure to have no overlap.
- if (this.count == 0 || that.array[0] > this.array[this.count-1])
- {
- System.arraycopy(that.array, 0, this.array, this.count, that.count);
- count += that.count;
- return;
- }
-
- if (this.array[0] > that.array[that.count-1])
- {
- System.arraycopy(this.array, 0, this.array, that.count, this.count);
- System.arraycopy(that.array, 0, this.array, 0, that.count);
- count += that.count;
- return;
- }
-
- int destPos = binarySearch(this.array, this.count, that.array[0]);
- if (destPos < 0)
- {
- destPos = -(destPos+1);
- }
-
- // Make space for the copy.
- int aCount = this.count - destPos;
- int aPos = destPos + that.count;
- int aEnd = aPos + aCount;
- System.arraycopy(this.array, destPos, this.array, aPos, aCount);
-
- // Optimize for the case where there is no overlap.
- if (this.array[aPos] > that.array[that.count-1])
- {
- System.arraycopy(that.array, 0, this.array, destPos, that.count);
- count += that.count;
- return;
- }
-
- int bPos;
- for ( bPos = 0; aPos < aEnd && bPos < that.count; )
- {
- if ( this.array[aPos] < that.array[bPos] )
- {
- this.array[destPos++] = this.array[aPos++];
- }
- else if ( this.array[aPos] > that.array[bPos] )
- {
- this.array[destPos++] = that.array[bPos++];
- }
- else
- {
- this.array[destPos++] = this.array[aPos++];
- bPos++;
- }
- }
-
- // Copy any remainder.
- int aRemain = aEnd - aPos;
- if (aRemain > 0)
- {
- System.arraycopy(this.array, aPos, this.array, destPos, aRemain);
- destPos += aRemain;
- }
-
- int bRemain = that.count - bPos;
- if (bRemain > 0)
- {
- System.arraycopy(that.array, bPos, this.array, destPos, bRemain);
- destPos += bRemain;
- }
-
- count = destPos;
- }
-
- /**
- * Return the number of IDs in an import ID set.
- *
- * @return The current size of an import ID set.
- */
- public int size()
- {
- return count;
- }
-
- private boolean add(long v)
- {
- resize(count+1);
-
- if (count == 0 || v > array[count-1])
- {
- array[count++] = v;
- return true;
- }
-
- int pos = binarySearch(array, count, v);
- if (pos >=0)
- {
- return false;
- }
-
- // For a negative return value r, the index -(r+1) gives the array
- // index at which the specified value can be inserted to maintain
- // the sorted order of the array.
- pos = -(pos+1);
-
- System.arraycopy(array, pos, array, pos+1, count-pos);
- array[pos] = v;
- count++;
- return true;
- }
-
- private static int binarySearch(long[] a, int count, long key)
- {
- int low = 0;
- int high = count-1;
-
- while (low <= high)
- {
- int mid = low + high >> 1;
- long midVal = a[mid];
-
- if (midVal < key)
- {
- low = mid + 1;
- }
- else if (midVal > key)
- {
- high = mid - 1;
- }
- else
- {
- return mid; // key found
- }
- }
- return -(low + 1); // key not found.
- }
-
- private void resize(int size)
- {
- if (array == null)
- {
- array = new long[size];
- }
- else if (array.length < size)
- {
- // Expand the size of the array in powers of two.
- int newSize = array.length == 0 ? 1 : array.length;
- do
- {
- newSize *= 2;
- } while (newSize < size);
-
- long[] newBytes = new long[newSize];
- System.arraycopy(array, 0, newBytes, 0, count);
- array = newBytes;
- }
- }
-
- /**
- * Create a byte array suitable to write to a JEB DB from an import ID set.
- *
- * @return A byte array suitable for writing to a JEB DB.
- */
- public byte[] toDatabase()
- {
- if(isDefined) {
- return encode();
- } else {
- return JebFormat.entryIDUndefinedSizeToDatabase(undefinedSize);
- }
- }
-
- private byte[] encode()
- {
- final int encodedSize = count * 8;
- final byte[] bytes = new byte[encodedSize];
- int pos = 0;
- for (int i = 0; i < count; i++) {
- final long id = array[i] & 0x00ffffffffL; // JNR: why is this necessary?
-
- // encode the entryID
- bytes[pos++] = (byte) ((id >>> 56) & 0xFF);
- bytes[pos++] = (byte) ((id >>> 48) & 0xFF);
- bytes[pos++] = (byte) ((id >>> 40) & 0xFF);
- bytes[pos++] = (byte) ((id >>> 32) & 0xFF);
- bytes[pos++] = (byte) ((id >>> 24) & 0xFF);
- bytes[pos++] = (byte) ((id >>> 16) & 0xFF);
- bytes[pos++] = (byte) ((id >>> 8) & 0xFF);
- bytes[pos++] = (byte) (id & 0xFF);
- }
- return bytes;
- }
-
- /**
- * Set the DB key related to an import ID set.
- *
- * @param key Byte array containing the key.
- */
- public void setKey(ByteBuffer key)
- {
- this.key = key;
- }
-
- /**
- * Return the DB key related to an import ID set.
- *
- * @return The byte array containing the key.
- */
- public ByteBuffer getKey()
- {
- return key;
- }
-}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/ImportLDIFReader.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/ImportLDIFReader.java
deleted file mode 100644
index 8b43c4a..0000000
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/ImportLDIFReader.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- * Copyright 2015 ForgeRock AS
- */
-package org.opends.server.backends.jeb.importLDIF;
-
-import static org.opends.messages.UtilityMessages.*;
-import static org.opends.server.util.StaticUtils.*;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.forgerock.i18n.LocalizableMessage;
-import org.forgerock.i18n.LocalizableMessageBuilder;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.util.Reject;
-import org.opends.server.api.plugin.PluginResult;
-import org.opends.server.backends.jeb.EntryID;
-import org.opends.server.backends.jeb.RootContainer;
-import org.opends.server.types.AttributeBuilder;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.DN;
-import org.opends.server.types.Entry;
-import org.opends.server.types.LDIFImportConfig;
-import org.opends.server.types.ObjectClass;
-import org.opends.server.util.LDIFException;
-import org.opends.server.util.LDIFReader;
-
-/**
- * This class specializes the LDIFReader for imports.
- */
-public final class ImportLDIFReader extends LDIFReader
-{
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- private final RootContainer rootContainer;
-
- /**
- * Creates a new LDIF reader that will read information from the specified file.
- *
- * @param importConfig
- * The import configuration for this LDIF reader. It must not be <CODE>null</CODE>.
- * @param rootContainer
- * The root container needed to get the next entry ID.
- * @throws IOException
- * If a problem occurs while opening the LDIF file for reading.
- */
- public ImportLDIFReader(LDIFImportConfig importConfig, RootContainer rootContainer) throws IOException
- {
- super(importConfig);
- Reject.ifNull(importConfig, rootContainer);
- this.rootContainer = rootContainer;
- }
-
- /**
- * Reads the next entry from the LDIF source.
- *
- * @return The next entry read from the LDIF source, or <CODE>null</CODE> if the end of the LDIF
- * data is reached.
- * @param suffixesMap
- * A map of suffixes instances.
- * @param entryInfo
- * A object to hold information about the entry ID and what suffix was selected.
- * @throws IOException
- * If an I/O problem occurs while reading from the file.
- * @throws LDIFException
- * If the information read cannot be parsed as an LDIF entry.
- */
- public final Entry readEntry(Map<DN, Suffix> suffixesMap, Importer.EntryInformation entryInfo) throws IOException,
- LDIFException
- {
- final boolean checkSchema = importConfig.validateSchema();
- while (true)
- {
- LinkedList<StringBuilder> lines;
- DN entryDN;
- EntryID entryID;
- Suffix suffix;
- synchronized (this)
- {
- // Read the set of lines that make up the next entry.
- lines = readEntryLines();
- if (lines == null)
- {
- return null;
- }
- lastEntryBodyLines = lines;
- lastEntryHeaderLines = new LinkedList<StringBuilder>();
-
- // Read the DN of the entry and see if it is one that should be included
- // in the import.
- try
- {
- entryDN = readDN(lines);
- }
- catch (LDIFException e)
- {
- logger.traceException(e);
- continue;
- }
-
- if (entryDN == null)
- {
- // This should only happen if the LDIF starts with the "version:" line
- // and has a blank line immediately after that. In that case, simply
- // read and return the next entry.
- continue;
- }
- else if (!importConfig.includeEntry(entryDN))
- {
- logger.trace("Skipping entry %s because the DN is not one that "
- + "should be included based on the include and exclude branches.", entryDN);
- entriesRead.incrementAndGet();
- logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
- continue;
- }
- suffix = Importer.getMatchSuffix(entryDN, suffixesMap);
- if (suffix == null)
- {
- logger.trace("Skipping entry %s because the DN is not one that "
- + "should be included based on a suffix match check.", entryDN);
- entriesRead.incrementAndGet();
- logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
- continue;
- }
- entriesRead.incrementAndGet();
- entryID = rootContainer.getNextEntryID();
- suffix.addPending(entryDN);
- }
-
- // Create the entry and see if it is one that should be included in the import
- final Entry entry = createEntry(lines, entryDN, checkSchema, suffix);
- if (entry == null
- || !isIncludedInImport(entry, suffix, lines)
- || !invokeImportPlugins(entry, suffix, lines)
- || (checkSchema && !isValidAgainstSchema(entry, suffix, lines)))
- {
- continue;
- }
- entryInfo.setEntryID(entryID);
- entryInfo.setSuffix(suffix);
- // The entry should be included in the import, so return it.
- return entry;
- }
- }
-
- private Entry createEntry(List<StringBuilder> lines, DN entryDN, boolean checkSchema, Suffix suffix)
- {
- // Read the set of attributes from the entry.
- Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>();
- Map<AttributeType, List<AttributeBuilder>> userAttrBuilders =
- new HashMap<AttributeType, List<AttributeBuilder>>();
- Map<AttributeType, List<AttributeBuilder>> operationalAttrBuilders =
- new HashMap<AttributeType, List<AttributeBuilder>>();
- try
- {
- for (StringBuilder line : lines)
- {
- readAttribute(lines, line, entryDN, objectClasses, userAttrBuilders, operationalAttrBuilders, checkSchema);
- }
- }
- catch (LDIFException e)
- {
- if (logger.isTraceEnabled())
- {
- logger.trace("Skipping entry %s because reading" + "its attributes failed.", entryDN);
- }
- logToSkipWriter(lines, ERR_LDIF_READ_ATTR_SKIP.get(entryDN, e.getMessage()));
- suffix.removePending(entryDN);
- return null;
- }
-
- final Entry entry = new Entry(entryDN, objectClasses,
- toAttributesMap(userAttrBuilders), toAttributesMap(operationalAttrBuilders));
- logger.trace("readEntry(), created entry: %s", entry);
- return entry;
- }
-
- private boolean isIncludedInImport(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
- {
- final DN entryDN = entry.getName();
- try
- {
- if (!importConfig.includeEntry(entry))
- {
- if (logger.isTraceEnabled())
- {
- logger.trace("Skipping entry %s because the DN is not one that "
- + "should be included based on the include and exclude filters.", entryDN);
- }
- logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN));
- suffix.removePending(entryDN);
- return false;
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
- suffix.removePending(entryDN);
- logToSkipWriter(lines,
- ERR_LDIF_COULD_NOT_EVALUATE_FILTERS_FOR_IMPORT.get(entry.getName(), lastEntryLineNumber, e));
- suffix.removePending(entryDN);
- return false;
- }
- return true;
- }
-
- private boolean invokeImportPlugins(final Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
- {
- if (importConfig.invokeImportPlugins())
- {
- PluginResult.ImportLDIF pluginResult = pluginConfigManager.invokeLDIFImportPlugins(importConfig, entry);
- if (!pluginResult.continueProcessing())
- {
- final DN entryDN = entry.getName();
- LocalizableMessage m;
- LocalizableMessage rejectMessage = pluginResult.getErrorMessage();
- if (rejectMessage == null)
- {
- m = ERR_LDIF_REJECTED_BY_PLUGIN_NOMESSAGE.get(entryDN);
- }
- else
- {
- m = ERR_LDIF_REJECTED_BY_PLUGIN.get(entryDN, rejectMessage);
- }
-
- logToRejectWriter(lines, m);
- suffix.removePending(entryDN);
- return false;
- }
- }
- return true;
- }
-
- private boolean isValidAgainstSchema(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines)
- {
- final DN entryDN = entry.getName();
- addRDNAttributesIfNecessary(entryDN, entry.getUserAttributes(), entry.getOperationalAttributes());
- // Add any superior objectclass(s) missing in the objectclass map.
- addSuperiorObjectClasses(entry.getObjectClasses());
-
- LocalizableMessageBuilder invalidReason = new LocalizableMessageBuilder();
- if (!entry.conformsToSchema(null, false, true, false, invalidReason))
- {
- LocalizableMessage message = ERR_LDIF_SCHEMA_VIOLATION.get(entryDN, lastEntryLineNumber, invalidReason);
- logToRejectWriter(lines, message);
- suffix.removePending(entryDN);
- return false;
- }
- return true;
- }
-}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
deleted file mode 100644
index 992590a..0000000
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ /dev/null
@@ -1,4359 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
- */
-package org.opends.server.backends.jeb.importLDIF;
-
-import static com.sleepycat.je.EnvironmentConfig.*;
-
-import static org.opends.messages.JebMessages.*;
-import static org.opends.server.admin.std.meta.LocalDBIndexCfgDefn.IndexType.*;
-import static org.opends.server.util.DynamicConstants.*;
-import static org.opends.server.util.ServerConstants.*;
-import static org.opends.server.util.StaticUtils.*;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.forgerock.i18n.LocalizableMessage;
-import org.forgerock.i18n.LocalizableMessageDescriptor.Arg3;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.opendj.config.server.ConfigException;
-import org.forgerock.opendj.ldap.ByteString;
-import org.forgerock.opendj.ldap.ResultCode;
-import org.forgerock.opendj.ldap.spi.IndexingOptions;
-import org.forgerock.util.Utils;
-import org.opends.server.admin.std.meta.LocalDBIndexCfgDefn.IndexType;
-import org.opends.server.admin.std.server.LocalDBBackendCfg;
-import org.opends.server.admin.std.server.LocalDBIndexCfg;
-import org.opends.server.api.DiskSpaceMonitorHandler;
-import org.opends.server.backends.RebuildConfig;
-import org.opends.server.backends.RebuildConfig.RebuildMode;
-import org.opends.server.backends.jeb.*;
-import org.opends.server.backends.jeb.RootContainer;
-import org.opends.server.backends.jeb.VLVIndex;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.extensions.DiskSpaceMonitor;
-import org.opends.server.types.*;
-import org.opends.server.util.Platform;
-import org.opends.server.util.StaticUtils;
-
-import com.sleepycat.je.*;
-import com.sleepycat.util.PackedInteger;
-
-/**
- * This class provides the engine that performs both importing of LDIF files and
- * the rebuilding of indexes.
- */
-public final class Importer implements DiskSpaceMonitorHandler
-{
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- private static final int TIMER_INTERVAL = 10000;
- private static final int KB = 1024;
- private static final int MB = KB * KB;
- private static final String DEFAULT_TMP_DIR = "import-tmp";
- private static final String TMPENV_DIR = "tmp-env";
-
- /** Defaults for DB cache. */
- private static final int MAX_DB_CACHE_SIZE = 8 * MB;
- private static final int MAX_DB_LOG_SIZE = 10 * MB;
- private static final int MIN_DB_CACHE_SIZE = 4 * MB;
-
- /**
- * Defaults for LDIF reader buffers, min memory required to import and default
- * size for byte buffers.
- */
- private static final int READER_WRITER_BUFFER_SIZE = 8 * KB;
- private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE
- + MAX_DB_LOG_SIZE;
- private static final int BYTE_BUFFER_CAPACITY = 128;
-
- /** Max size of phase one buffer. */
- private static final int MAX_BUFFER_SIZE = 2 * MB;
- /** Min size of phase one buffer. */
- private static final int MIN_BUFFER_SIZE = 4 * KB;
- /** Min size of phase two read-ahead cache. */
- private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB;
- /** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
- private static final int SMALL_HEAP_SIZE = 256 * MB;
-
- /** The DN attribute type. */
- private static AttributeType dnType;
- static final IndexOutputBuffer.IndexComparator indexComparator =
- new IndexOutputBuffer.IndexComparator();
-
- /** Phase one buffer count. */
- private final AtomicInteger bufferCount = new AtomicInteger(0);
- /** Phase one imported entries count. */
- private final AtomicLong importCount = new AtomicLong(0);
-
- /** Phase one buffer size in bytes. */
- private int bufferSize;
-
- /** Temp scratch directory. */
- private final File tempDir;
-
- /** Index count. */
- private final int indexCount;
- /** Thread count. */
- private int threadCount;
-
- /** Set to true when validation is skipped. */
- private final boolean skipDNValidation;
-
- /** Temporary environment used when DN validation is done in first phase. */
- private final TmpEnv tmpEnv;
-
- /** Root container. */
- private RootContainer rootContainer;
-
- /** Import configuration. */
- private final LDIFImportConfig importConfiguration;
- /** Backend configuration. */
- private final LocalDBBackendCfg backendConfiguration;
-
- /** LDIF reader. */
- private ImportLDIFReader reader;
-
- /** Migrated entry count. */
- private int migratedCount;
-
- /** Size in bytes of temporary env. */
- private long tmpEnvCacheSize;
- /** Available memory at the start of the import. */
- private long availableMemory;
- /** Size in bytes of DB cache. */
- private long dbCacheSize;
-
- /** The executor service used for the buffer sort tasks. */
- private ExecutorService bufferSortService;
- /** The executor service used for the scratch file processing tasks. */
- private ExecutorService scratchFileWriterService;
-
- /** Queue of free index buffers -- used to re-cycle index buffers. */
- private final BlockingQueue<IndexOutputBuffer> freeBufferQueue =
- new LinkedBlockingQueue<IndexOutputBuffer>();
-
- /**
- * Map of index keys to index buffers. Used to allocate sorted index buffers
- * to a index writer thread.
- */
- private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap =
- new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
-
- /** Map of DB containers to index managers. Used to start phase 2. */
- private final List<IndexManager> indexMgrList =
- new LinkedList<IndexManager>();
- /** Map of DB containers to DN-based index managers. Used to start phase 2. */
- private final List<IndexManager> DNIndexMgrList =
- new LinkedList<IndexManager>();
-
- /**
- * Futures used to indicate when the index file writers are done flushing
- * their work queues and have exited. End of phase one.
- */
- private final List<Future<Void>> scratchFileWriterFutures;
- /**
- * List of index file writer tasks. Used to signal stopScratchFileWriters to
- * the index file writer tasks when the LDIF file has been done.
- */
- private final List<ScratchFileWriterTask> scratchFileWriterList;
-
- /** Map of DNs to Suffix objects. */
- private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
- /** Map of container ids to database containers. */
- private final ConcurrentHashMap<Integer, Index> idContainerMap = new ConcurrentHashMap<Integer, Index>();
- /** Map of container ids to entry containers. */
- private final ConcurrentHashMap<Integer, EntryContainer> idECMap =
- new ConcurrentHashMap<Integer, EntryContainer>();
-
- /** Used to synchronize when a scratch file index writer is first setup. */
- private final Object synObj = new Object();
-
- /** Rebuild index manager used when rebuilding indexes. */
- private final RebuildIndexManager rebuildManager;
-
- /** Set to true if the backend was cleared. */
- private final boolean clearedBackend;
-
- /** Used to shutdown import if an error occurs in phase one. */
- private volatile boolean isCanceled;
- private volatile boolean isPhaseOneDone;
-
- /** Number of phase one buffers. */
- private int phaseOneBufferCount;
-
- static
- {
- dnType = DirectoryServer.getAttributeType("dn");
- if (dnType == null)
- {
- dnType = DirectoryServer.getDefaultAttributeType("dn");
- }
- }
-
- /**
- * Create a new import job with the specified rebuild index config.
- *
- * @param rebuildConfig
- * The rebuild index configuration.
- * @param cfg
- * The local DB back-end configuration.
- * @param envConfig
- * The JEB environment config.
- * @throws InitializationException
- * If a problem occurs during initialization.
- * @throws JebException
- * If an error occurred when opening the DB.
- * @throws ConfigException
- * If a problem occurs during initialization.
- */
- public Importer(RebuildConfig rebuildConfig, LocalDBBackendCfg cfg,
- EnvironmentConfig envConfig) throws InitializationException,
- JebException, ConfigException
- {
- this.importConfiguration = null;
- this.backendConfiguration = cfg;
- this.tmpEnv = null;
- this.threadCount = 1;
- this.rebuildManager = new RebuildIndexManager(rebuildConfig, cfg);
- this.indexCount = rebuildManager.getIndexCount();
- this.clearedBackend = false;
- this.scratchFileWriterList =
- new ArrayList<ScratchFileWriterTask>(indexCount);
- this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>();
-
- this.tempDir = getTempDir(cfg, rebuildConfig.getTmpDirectory());
- recursiveDelete(tempDir);
- if (!tempDir.exists() && !tempDir.mkdirs())
- {
- throw new InitializationException(ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
- }
- this.skipDNValidation = true;
- initializeDBEnv(envConfig);
- }
-
- /**
- * Create a new import job with the specified ldif import config.
- *
- * @param importConfiguration
- * The LDIF import configuration.
- * @param localDBBackendCfg
- * The local DB back-end configuration.
- * @param envConfig
- * The JEB environment config.
- * @throws InitializationException
- * If a problem occurs during initialization.
- * @throws ConfigException
- * If a problem occurs reading the configuration.
- * @throws DatabaseException
- * If an error occurred when opening the DB.
- */
- public Importer(LDIFImportConfig importConfiguration,
- LocalDBBackendCfg localDBBackendCfg, EnvironmentConfig envConfig)
- throws InitializationException, ConfigException, DatabaseException
- {
- this.rebuildManager = null;
- this.importConfiguration = importConfiguration;
- this.backendConfiguration = localDBBackendCfg;
-
- if (importConfiguration.getThreadCount() == 0)
- {
- this.threadCount = Runtime.getRuntime().availableProcessors() * 2;
- }
- else
- {
- this.threadCount = importConfiguration.getThreadCount();
- }
-
- // Determine the number of indexes.
- this.indexCount = getTotalIndexCount(localDBBackendCfg);
-
- this.clearedBackend = mustClearBackend(importConfiguration, localDBBackendCfg);
- this.scratchFileWriterList =
- new ArrayList<ScratchFileWriterTask>(indexCount);
- this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>();
-
- this.tempDir = getTempDir(localDBBackendCfg, importConfiguration.getTmpDirectory());
- recursiveDelete(tempDir);
- if (!tempDir.exists() && !tempDir.mkdirs())
- {
- throw new InitializationException(ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
- }
- skipDNValidation = importConfiguration.getSkipDNValidation();
- initializeDBEnv(envConfig);
-
- // Set up temporary environment.
- if (!skipDNValidation)
- {
- File envPath = new File(tempDir, TMPENV_DIR);
- envPath.mkdirs();
- this.tmpEnv = new TmpEnv(envPath);
- }
- else
- {
- this.tmpEnv = null;
- }
- }
-
- /**
- * Returns whether the backend must be cleared.
- *
- * @param importCfg the import configuration object
- * @param backendCfg the backend configuration object
- * @return true if the backend must be cleared, false otherwise
- */
- public static boolean mustClearBackend(LDIFImportConfig importCfg, LocalDBBackendCfg backendCfg)
- {
- return !importCfg.appendToExistingData()
- && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
- }
-
- private File getTempDir(LocalDBBackendCfg localDBBackendCfg, String tmpDirectory)
- {
- File parentDir;
- if (tmpDirectory != null)
- {
- parentDir = getFileForPath(tmpDirectory);
- }
- else
- {
- parentDir = getFileForPath(DEFAULT_TMP_DIR);
- }
- return new File(parentDir, localDBBackendCfg.getBackendId());
- }
-
- private int getTotalIndexCount(LocalDBBackendCfg localDBBackendCfg)
- throws ConfigException
- {
- int indexes = 2; // dn2id, dn2uri
- for (String indexName : localDBBackendCfg.listLocalDBIndexes())
- {
- LocalDBIndexCfg index = localDBBackendCfg.getLocalDBIndex(indexName);
- SortedSet<IndexType> types = index.getIndexType();
- if (types.contains(IndexType.EXTENSIBLE))
- {
- indexes += types.size() - 1 + index.getIndexExtensibleMatchingRule().size();
- }
- else
- {
- indexes += types.size();
- }
- }
- return indexes;
- }
-
- /**
- * Return the suffix instance in the specified map that matches the specified
- * DN.
- *
- * @param dn
- * The DN to search for.
- * @param map
- * The map to search.
- * @return The suffix instance that matches the DN, or null if no match is
- * found.
- */
- public static Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map)
- {
- Suffix suffix = null;
- DN nodeDN = dn;
-
- while (suffix == null && nodeDN != null)
- {
- suffix = map.get(nodeDN);
- if (suffix == null)
- {
- nodeDN = nodeDN.getParentDNInSuffix();
- }
- }
- return suffix;
- }
-
- /**
- * Calculate buffer sizes and initialize JEB properties based on memory.
- *
- * @param envConfig
- * The environment config to use in the calculations.
- * @throws InitializationException
- * If a problem occurs during calculation.
- */
- private void initializeDBEnv(EnvironmentConfig envConfig) throws InitializationException
- {
- // Calculate amount of usable memory. This will need to take into account
- // various fudge factors, including the number of IO buffers used by the
- // scratch writers (1 per index).
- calculateAvailableMemory();
-
- final long usableMemory = availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
-
- // We need caching when doing DN validation or rebuilding indexes.
- if (!skipDNValidation || rebuildManager != null)
- {
- // No DN validation: calculate memory for DB cache, DN2ID temporary cache,
- // and buffers.
- if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
- {
- dbCacheSize = 500 * KB;
- tmpEnvCacheSize = 500 * KB;
- }
- else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
- {
- dbCacheSize = MIN_DB_CACHE_SIZE;
- tmpEnvCacheSize = MIN_DB_CACHE_SIZE;
- }
- else if (!clearedBackend)
- {
- // Appending to existing data so reserve extra memory for the DB cache
- // since it will be needed for dn2id queries.
- dbCacheSize = usableMemory * 33 / 100;
- tmpEnvCacheSize = usableMemory * 33 / 100;
- }
- else
- {
- dbCacheSize = MAX_DB_CACHE_SIZE;
- tmpEnvCacheSize = usableMemory * 66 / 100;
- }
- }
- else
- {
- // No DN validation: calculate memory for DB cache and buffers.
-
- // No need for DN2ID cache.
- tmpEnvCacheSize = 0;
-
- if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
- {
- dbCacheSize = 500 * KB;
- }
- else if (usableMemory < MIN_DB_CACHE_MEMORY)
- {
- dbCacheSize = MIN_DB_CACHE_SIZE;
- }
- else
- {
- // No need to differentiate between append/clear backend, since dn2id is
- // not being queried.
- dbCacheSize = MAX_DB_CACHE_SIZE;
- }
- }
-
- final long phaseOneBufferMemory = usableMemory - dbCacheSize - tmpEnvCacheSize;
- final int oldThreadCount = threadCount;
- if (indexCount != 0) // Avoid / by zero
- {
- while (true)
- {
- phaseOneBufferCount = 2 * indexCount * threadCount;
-
- // Scratch writers allocate 4 buffers per index as well.
- final int totalPhaseOneBufferCount = phaseOneBufferCount + (4 * indexCount);
- long longBufferSize = phaseOneBufferMemory / totalPhaseOneBufferCount;
- // We need (2 * bufferSize) to fit in an int for the insertByteStream
- // and deleteByteStream constructors.
- bufferSize = (int) Math.min(longBufferSize, Integer.MAX_VALUE / 2);
-
- if (bufferSize > MAX_BUFFER_SIZE)
- {
- if (!skipDNValidation)
- {
- // The buffers are big enough: the memory is best used for the DN2ID temp DB
- bufferSize = MAX_BUFFER_SIZE;
-
- final long extraMemory = phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
- if (!clearedBackend)
- {
- dbCacheSize += extraMemory / 2;
- tmpEnvCacheSize += extraMemory / 2;
- }
- else
- {
- tmpEnvCacheSize += extraMemory;
- }
- }
-
- break;
- }
- else if (bufferSize > MIN_BUFFER_SIZE)
- {
- // This is acceptable.
- break;
- }
- else if (threadCount > 1)
- {
- // Retry using less threads.
- threadCount--;
- }
- else
- {
- // Not enough memory.
- final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
- LocalizableMessage message =
- ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory,
- minimumPhaseOneBufferMemory + dbCacheSize + tmpEnvCacheSize);
- throw new InitializationException(message);
- }
- }
- }
-
- if (oldThreadCount != threadCount)
- {
- logger.info(NOTE_JEB_IMPORT_ADJUST_THREAD_COUNT, oldThreadCount, threadCount);
- }
-
- logger.info(NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, phaseOneBufferCount);
- if (tmpEnvCacheSize > 0)
- {
- logger.info(NOTE_JEB_IMPORT_LDIF_TMP_ENV_MEM, tmpEnvCacheSize);
- }
- envConfig.setConfigParam(MAX_MEMORY, Long.toString(dbCacheSize));
- logger.info(NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO, dbCacheSize, bufferSize);
- }
-
- /**
- * Calculates the amount of available memory which can be used by this import,
- * taking into account whether or not the import is running offline or online
- * as a task.
- */
- private void calculateAvailableMemory()
- {
- final long totalAvailableMemory;
- if (DirectoryServer.isRunning())
- {
- // Online import/rebuild.
- Runtime runTime = Runtime.getRuntime();
- // call twice gc to ensure finalizers are called
- // and young to old gen references are properly gc'd
- runTime.gc();
- runTime.gc();
- final long usedMemory = runTime.totalMemory() - runTime.freeMemory();
- final long maxUsableMemory = Platform.getUsableMemoryForCaching();
- final long usableMemory = maxUsableMemory - usedMemory;
-
- final long configuredMemory;
- if (backendConfiguration.getDBCacheSize() > 0)
- {
- configuredMemory = backendConfiguration.getDBCacheSize();
- }
- else
- {
- configuredMemory = backendConfiguration.getDBCachePercent() * Runtime.getRuntime().maxMemory() / 100;
- }
-
- // Round up to minimum of 16MB (e.g. unit tests only use 2% cache).
- totalAvailableMemory = Math.max(Math.min(usableMemory, configuredMemory), 16 * MB);
- }
- else
- {
- // Offline import/rebuild.
- totalAvailableMemory = Platform.getUsableMemoryForCaching();
- }
-
- // Now take into account various fudge factors.
- int importMemPct = 90;
- if (totalAvailableMemory <= SMALL_HEAP_SIZE)
- {
- // Be pessimistic when memory is low.
- importMemPct -= 25;
- }
- if (rebuildManager != null)
- {
- // Rebuild seems to require more overhead.
- importMemPct -= 15;
- }
-
- availableMemory = totalAvailableMemory * importMemPct / 100;
- }
-
- private void initializeIndexBuffers()
- {
- for (int i = 0; i < phaseOneBufferCount; i++)
- {
- freeBufferQueue.add(new IndexOutputBuffer(bufferSize));
- }
- }
-
- private void initializeSuffixes() throws DatabaseException, ConfigException,
- InitializationException
- {
- for (EntryContainer ec : rootContainer.getEntryContainers())
- {
- Suffix suffix = getSuffix(ec);
- if (suffix != null)
- {
- dnSuffixMap.put(ec.getBaseDN(), suffix);
- generateIndexID(suffix);
- }
- }
- }
-
- /**
- * Mainly used to support multiple suffixes. Each index in each suffix gets an
- * unique ID to identify which DB it needs to go to in phase two processing.
- */
- private void generateIndexID(Suffix suffix)
- {
- for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
- {
- AttributeIndex attributeIndex = mapEntry.getValue();
- putInIdContainerMap(attributeIndex.getEqualityIndex());
- putInIdContainerMap(attributeIndex.getPresenceIndex());
- putInIdContainerMap(attributeIndex.getSubstringIndex());
- putInIdContainerMap(attributeIndex.getOrderingIndex());
- putInIdContainerMap(attributeIndex.getApproximateIndex());
- Map<String, Collection<Index>> extensibleMap = attributeIndex.getExtensibleIndexes();
- if (!extensibleMap.isEmpty())
- {
- putInIdContainerMap(extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING));
- putInIdContainerMap(extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED));
- }
- }
- }
-
- private void putInIdContainerMap(Collection<Index> indexes)
- {
- if (indexes != null)
- {
- for (Index index : indexes)
- {
- putInIdContainerMap(index);
- }
- }
- }
-
- private void putInIdContainerMap(Index index)
- {
- if (index != null)
- {
- int id = System.identityHashCode(index);
- idContainerMap.putIfAbsent(id, index);
- }
- }
-
- private Suffix getSuffix(EntryContainer entryContainer)
- throws ConfigException, InitializationException
- {
- DN baseDN = entryContainer.getBaseDN();
- EntryContainer sourceEntryContainer = null;
- List<DN> includeBranches = new ArrayList<DN>();
- List<DN> excludeBranches = new ArrayList<DN>();
-
- if (!importConfiguration.appendToExistingData()
- && !importConfiguration.clearBackend())
- {
- for (DN dn : importConfiguration.getExcludeBranches())
- {
- if (baseDN.equals(dn))
- {
- // This entire base DN was explicitly excluded. Skip.
- return null;
- }
- if (baseDN.isAncestorOf(dn))
- {
- excludeBranches.add(dn);
- }
- }
-
- if (!importConfiguration.getIncludeBranches().isEmpty())
- {
- for (DN dn : importConfiguration.getIncludeBranches())
- {
- if (baseDN.isAncestorOf(dn))
- {
- includeBranches.add(dn);
- }
- }
-
- if (includeBranches.isEmpty())
- {
- /*
- * There are no branches in the explicitly defined include list under
- * this base DN. Skip this base DN all together.
- */
- return null;
- }
-
- // Remove any overlapping include branches.
- Iterator<DN> includeBranchIterator = includeBranches.iterator();
- while (includeBranchIterator.hasNext())
- {
- DN includeDN = includeBranchIterator.next();
- if (!isAnyNotEqualAndAncestorOf(includeBranches, includeDN))
- {
- includeBranchIterator.remove();
- }
- }
-
- // Remove any exclude branches that are not are not under a include
- // branch since they will be migrated as part of the existing entries
- // outside of the include branches anyways.
- Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
- while (excludeBranchIterator.hasNext())
- {
- DN excludeDN = excludeBranchIterator.next();
- if (!isAnyAncestorOf(includeBranches, excludeDN))
- {
- excludeBranchIterator.remove();
- }
- }
-
- if (excludeBranches.isEmpty()
- && includeBranches.size() == 1
- && includeBranches.get(0).equals(baseDN))
- {
- // This entire base DN is explicitly included in the import with
- // no exclude branches that we need to migrate. Just clear the entry
- // container.
- clearSuffix(entryContainer);
- }
- else
- {
- // Create a temp entry container
- sourceEntryContainer = entryContainer;
- entryContainer = rootContainer.openEntryContainer(baseDN, baseDN.toIrreversibleReadableString()
- + "_importTmp");
- }
- }
- }
- return new Suffix(entryContainer, sourceEntryContainer, includeBranches, excludeBranches);
- }
-
- private void clearSuffix(EntryContainer entryContainer)
- {
- entryContainer.lock();
- entryContainer.clear();
- entryContainer.unlock();
- }
-
- private boolean isAnyNotEqualAndAncestorOf(List<DN> dns, DN childDN)
- {
- for (DN dn : dns)
- {
- if (!dn.equals(childDN) && dn.isAncestorOf(childDN))
- {
- return false;
- }
- }
- return true;
- }
-
- private boolean isAnyAncestorOf(List<DN> dns, DN childDN)
- {
- for (DN dn : dns)
- {
- if (dn.isAncestorOf(childDN))
- {
- return true;
- }
- }
- return false;
- }
-
- /**
- * Rebuild the indexes using the specified root container.
- *
- * @param rootContainer
- * The root container to rebuild indexes in.
- * @throws ConfigException
- * If a configuration error occurred.
- * @throws InitializationException
- * If an initialization error occurred.
- * @throws JebException
- * If the JEB database had an error.
- * @throws InterruptedException
- * If an interrupted error occurred.
- * @throws ExecutionException
- * If an execution error occurred.
- */
- public void rebuildIndexes(RootContainer rootContainer)
- throws ConfigException, InitializationException, JebException,
- InterruptedException, ExecutionException
- {
- this.rootContainer = rootContainer;
- long startTime = System.currentTimeMillis();
-
- DiskSpaceMonitor tmpMonitor = createDiskSpaceMonitor(tempDir, "backend index rebuild tmp directory");
- tmpMonitor.initializeMonitorProvider(null);
- DirectoryServer.registerMonitorProvider(tmpMonitor);
- File parentDirectory = getFileForPath(backendConfiguration.getDBDirectory());
- File backendDirectory = new File(parentDirectory, backendConfiguration.getBackendId());
- DiskSpaceMonitor dbMonitor = createDiskSpaceMonitor(backendDirectory, "backend index rebuild DB directory");
- dbMonitor.initializeMonitorProvider(null);
- DirectoryServer.registerMonitorProvider(dbMonitor);
-
- try
- {
- rebuildManager.initialize();
- rebuildManager.printStartMessage();
- rebuildManager.rebuildIndexes();
- recursiveDelete(tempDir);
- rebuildManager.printStopMessage(startTime);
- }
- finally
- {
- DirectoryServer.deregisterMonitorProvider(tmpMonitor);
- DirectoryServer.deregisterMonitorProvider(dbMonitor);
- tmpMonitor.finalizeMonitorProvider();
- dbMonitor.finalizeMonitorProvider();
- }
- }
-
- /**
- * Import a LDIF using the specified root container.
- *
- * @param rootContainer
- * The root container to use during the import.
- * @return A LDIF result.
- * @throws ConfigException
- * If the import failed because of an configuration error.
- * @throws InitializationException
- * If the import failed because of an initialization error.
- * @throws JebException
- * If the import failed due to a database error.
- * @throws InterruptedException
- * If the import failed due to an interrupted error.
- * @throws ExecutionException
- * If the import failed due to an execution error.
- */
- public LDIFImportResult processImport(RootContainer rootContainer)
- throws ConfigException, InitializationException, JebException,
- InterruptedException, ExecutionException
- {
- this.rootContainer = rootContainer;
- DiskSpaceMonitor tmpMonitor = null;
- DiskSpaceMonitor dbMonitor = null;
- try {
- try
- {
- reader = new ImportLDIFReader(importConfiguration, rootContainer);
- }
- catch (IOException ioe)
- {
- LocalizableMessage message = ERR_JEB_IMPORT_LDIF_READER_IO_ERROR.get();
- throw new InitializationException(message, ioe);
- }
-
- tmpMonitor = createDiskSpaceMonitor(tempDir, "backend import tmp directory");
- tmpMonitor.initializeMonitorProvider(null);
- DirectoryServer.registerMonitorProvider(tmpMonitor);
- File parentDirectory = getFileForPath(backendConfiguration.getDBDirectory());
- File backendDirectory = new File(parentDirectory, backendConfiguration.getBackendId());
- dbMonitor = createDiskSpaceMonitor(backendDirectory, "backend import DB directory");
- dbMonitor.initializeMonitorProvider(null);
- DirectoryServer.registerMonitorProvider(dbMonitor);
-
- logger.info(NOTE_JEB_IMPORT_STARTING, DirectoryServer.getVersionString(),
- BUILD_ID, REVISION_NUMBER);
- logger.info(NOTE_JEB_IMPORT_THREAD_COUNT, threadCount);
- initializeSuffixes();
- setIndexesTrusted(false);
-
- final long startTime = System.currentTimeMillis();
- phaseOne();
- isPhaseOneDone = true;
- final long phaseOneFinishTime = System.currentTimeMillis();
-
- if (!skipDNValidation)
- {
- tmpEnv.shutdown();
- }
- if (isCanceled)
- {
- throw new InterruptedException("Import processing canceled.");
- }
-
- final long phaseTwoTime = System.currentTimeMillis();
- phaseTwo();
- if (isCanceled)
- {
- throw new InterruptedException("Import processing canceled.");
- }
- final long phaseTwoFinishTime = System.currentTimeMillis();
-
- setIndexesTrusted(true);
- switchEntryContainers();
- recursiveDelete(tempDir);
- final long finishTime = System.currentTimeMillis();
- final long importTime = finishTime - startTime;
- logger.info(NOTE_JEB_IMPORT_PHASE_STATS, importTime / 1000,
- (phaseOneFinishTime - startTime) / 1000,
- (phaseTwoFinishTime - phaseTwoTime) / 1000);
- float rate = 0;
- if (importTime > 0)
- {
- rate = 1000f * reader.getEntriesRead() / importTime;
- }
- logger.info(NOTE_JEB_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount.get(),
- reader.getEntriesIgnored(), reader.getEntriesRejected(),
- migratedCount, importTime / 1000, rate);
- return new LDIFImportResult(reader.getEntriesRead(),
- reader.getEntriesRejected(), reader.getEntriesIgnored());
- }
- finally
- {
- close(reader);
- if (!skipDNValidation)
- {
- try
- {
- tmpEnv.shutdown();
- }
- catch (Exception ignored)
- {
- // Do nothing.
- }
- }
- if (tmpMonitor != null)
- {
- DirectoryServer.deregisterMonitorProvider(tmpMonitor);
- tmpMonitor.finalizeMonitorProvider();
- }
- if (dbMonitor != null)
- {
- DirectoryServer.deregisterMonitorProvider(dbMonitor);
- dbMonitor.finalizeMonitorProvider();
- }
- }
- }
-
- private DiskSpaceMonitor createDiskSpaceMonitor(File dir, String backendSuffix)
- {
- final LocalDBBackendCfg cfg = backendConfiguration;
- return new DiskSpaceMonitor(cfg.getBackendId() + " " + backendSuffix, dir,
- cfg.getDiskLowThreshold(), cfg.getDiskFullThreshold(), 5, TimeUnit.SECONDS, this);
- }
-
- private void recursiveDelete(File dir)
- {
- if (dir.listFiles() != null)
- {
- for (File f : dir.listFiles())
- {
- if (f.isDirectory())
- {
- recursiveDelete(f);
- }
- f.delete();
- }
- }
- dir.delete();
- }
-
- private void switchEntryContainers() throws DatabaseException, JebException, InitializationException
- {
- for (Suffix suffix : dnSuffixMap.values())
- {
- DN baseDN = suffix.getBaseDN();
- EntryContainer entryContainer = suffix.getSrcEntryContainer();
- if (entryContainer != null)
- {
- final EntryContainer toDelete = rootContainer.unregisterEntryContainer(baseDN);
- toDelete.lock();
- toDelete.close();
- toDelete.delete();
- toDelete.unlock();
-
- final EntryContainer replacement = suffix.getEntryContainer();
- replacement.lock();
- replacement.setDatabasePrefix(baseDN.toIrreversibleReadableString());
- replacement.unlock();
- rootContainer.registerEntryContainer(baseDN, replacement);
- }
- }
- }
-
- private void setIndexesTrusted(boolean trusted) throws JebException
- {
- try
- {
- for (Suffix s : dnSuffixMap.values())
- {
- s.setIndexesTrusted(trusted);
- }
- }
- catch (DatabaseException ex)
- {
- throw new JebException(NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()));
- }
- }
-
- private void phaseOne() throws InterruptedException, ExecutionException
- {
- initializeIndexBuffers();
-
- final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
- scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
- scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
- bufferSortService = Executors.newFixedThreadPool(threadCount);
- final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
-
- final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
- tasks.add(new MigrateExistingTask());
- getAll(execService.invokeAll(tasks));
- tasks.clear();
-
- if (importConfiguration.appendToExistingData()
- && importConfiguration.replaceExistingEntries())
- {
- for (int i = 0; i < threadCount; i++)
- {
- tasks.add(new AppendReplaceTask());
- }
- }
- else
- {
- for (int i = 0; i < threadCount; i++)
- {
- tasks.add(new ImportTask());
- }
- }
- getAll(execService.invokeAll(tasks));
- tasks.clear();
-
- tasks.add(new MigrateExcludedTask());
- getAll(execService.invokeAll(tasks));
-
- stopScratchFileWriters();
- getAll(scratchFileWriterFutures);
-
- shutdownAll(timerService, execService, bufferSortService, scratchFileWriterService);
-
- // Try to clear as much memory as possible.
- clearAll(scratchFileWriterList, scratchFileWriterFutures, freeBufferQueue);
- indexKeyQueueMap.clear();
- }
-
- private void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
- {
- timerService.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
- }
-
- private void shutdownAll(ExecutorService... executorServices) throws InterruptedException
- {
- for (ExecutorService executorService : executorServices)
- {
- executorService.shutdown();
- }
- for (ExecutorService executorService : executorServices)
- {
- executorService.awaitTermination(30, TimeUnit.SECONDS);
- }
- }
-
- private void clearAll(Collection<?>... cols)
- {
- for (Collection<?> col : cols)
- {
- col.clear();
- }
- }
-
- private void phaseTwo() throws InterruptedException, ExecutionException
- {
- ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
- scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
- try
- {
- processIndexFiles();
- }
- finally
- {
- shutdownAll(timerService);
- }
- }
-
- private void processIndexFiles() throws InterruptedException, ExecutionException
- {
- if (bufferCount.get() == 0)
- {
- return;
- }
- int dbThreads = Runtime.getRuntime().availableProcessors();
- if (dbThreads < 4)
- {
- dbThreads = 4;
- }
-
- // Calculate memory / buffer counts.
- final long usableMemory = availableMemory - dbCacheSize;
- int readAheadSize;
- int buffers;
- while (true)
- {
- final List<IndexManager> allIndexMgrs = new ArrayList<IndexManager>(DNIndexMgrList);
- allIndexMgrs.addAll(indexMgrList);
- Collections.sort(allIndexMgrs, Collections.reverseOrder());
-
- buffers = 0;
- final int limit = Math.min(dbThreads, allIndexMgrs.size());
- for (int i = 0; i < limit; i++)
- {
- buffers += allIndexMgrs.get(i).numberOfBuffers;
- }
-
- readAheadSize = (int) (usableMemory / buffers);
- if (readAheadSize > bufferSize)
- {
- // Cache size is never larger than the buffer size.
- readAheadSize = bufferSize;
- break;
- }
- else if (readAheadSize > MIN_READ_AHEAD_CACHE_SIZE)
- {
- // This is acceptable.
- break;
- }
- else if (dbThreads > 1)
- {
- // Reduce thread count.
- dbThreads--;
- }
- else
- {
- // Not enough memory - will need to do batching for the biggest indexes.
- readAheadSize = MIN_READ_AHEAD_CACHE_SIZE;
- buffers = (int) (usableMemory / readAheadSize);
-
- logger.warn(WARN_IMPORT_LDIF_LACK_MEM_PHASE_TWO, usableMemory);
- break;
- }
- }
-
- // Ensure that there are minimum two threads available for parallel
- // processing of smaller indexes.
- dbThreads = Math.max(2, dbThreads);
-
- logger.info(NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT, availableMemory, readAheadSize, buffers);
-
- // Start indexing tasks.
- List<Future<Void>> futures = new LinkedList<Future<Void>>();
- ExecutorService dbService = Executors.newFixedThreadPool(dbThreads);
- Semaphore permits = new Semaphore(buffers);
-
- // Start DN processing first.
- submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
- submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
- getAll(futures);
- shutdownAll(dbService);
- }
-
- private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService, Semaphore permits,
- int buffers, int readAheadSize, List<Future<Void>> futures)
- {
- for (IndexManager indexMgr : indexMgrs)
- {
- futures.add(dbService.submit(new IndexDBWriteTask(indexMgr, permits, buffers, readAheadSize)));
- }
- }
-
- private <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException
- {
- for (Future<?> result : futures)
- {
- result.get();
- }
- }
-
- private void stopScratchFileWriters()
- {
- final IndexOutputBuffer stopProcessing = IndexOutputBuffer.poison();
- for (ScratchFileWriterTask task : scratchFileWriterList)
- {
- task.queue.add(stopProcessing);
- }
- }
-
- /** Task used to migrate excluded branch. */
- private final class MigrateExcludedTask extends ImportTask
- {
-
- /** {@inheritDoc} */
- @Override
- public Void call() throws Exception
- {
- for (Suffix suffix : dnSuffixMap.values())
- {
- EntryContainer entryContainer = suffix.getSrcEntryContainer();
- if (entryContainer != null && !suffix.getExcludeBranches().isEmpty())
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- LockMode lockMode = LockMode.DEFAULT;
- logger.info(NOTE_JEB_IMPORT_MIGRATION_START, "excluded", suffix.getBaseDN());
- Cursor cursor = entryContainer.getDN2ID().openCursor(null, CursorConfig.READ_COMMITTED);
- Comparator<byte[]> comparator = entryContainer.getDN2ID().getComparator();
- try
- {
- for (DN excludedDN : suffix.getExcludeBranches())
- {
- byte[] bytes = JebFormat.dnToDNKey(excludedDN, suffix.getBaseDN().size());
- key.setData(bytes);
- OperationStatus status = cursor.getSearchKeyRange(key, data, lockMode);
- if (status == OperationStatus.SUCCESS
- && Arrays.equals(key.getData(), bytes))
- {
- // This is the base entry for a branch that was excluded in the
- // import so we must migrate all entries in this branch over to
- // the new entry container.
- byte[] end = Arrays.copyOf(bytes, bytes.length + 1);
- end[end.length - 1] = 0x01;
-
- while (status == OperationStatus.SUCCESS
- && comparator.compare(key.getData(), end) < 0
- && !importConfiguration.isCancelled() && !isCanceled)
- {
- EntryID id = new EntryID(data);
- Entry entry = entryContainer.getID2Entry().get(null, id, LockMode.DEFAULT);
- processEntry(entry, rootContainer.getNextEntryID(), suffix);
- migratedCount++;
- status = cursor.getNext(key, data, lockMode);
- }
- }
- }
- flushIndexBuffers();
- }
- catch (Exception e)
- {
- logger.error(ERR_JEB_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR, e.getMessage());
- isCanceled = true;
- throw e;
- }
- finally
- {
- close(cursor);
- }
- }
- }
- return null;
- }
- }
-
- /** Task to migrate existing entries. */
- private final class MigrateExistingTask extends ImportTask
- {
-
- /** {@inheritDoc} */
- @Override
- public Void call() throws Exception
- {
- for (Suffix suffix : dnSuffixMap.values())
- {
- EntryContainer entryContainer = suffix.getSrcEntryContainer();
- if (entryContainer != null && !suffix.getIncludeBranches().isEmpty())
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- LockMode lockMode = LockMode.DEFAULT;
- logger.info(NOTE_JEB_IMPORT_MIGRATION_START, "existing", suffix.getBaseDN());
- Cursor cursor = entryContainer.getDN2ID().openCursor(null, null);
- try
- {
- final List<byte[]> includeBranches = includeBranchesAsBytes(suffix);
- OperationStatus status = cursor.getFirst(key, data, lockMode);
- while (status == OperationStatus.SUCCESS
- && !importConfiguration.isCancelled() && !isCanceled)
- {
- if (!find(includeBranches, key.getData()))
- {
- EntryID id = new EntryID(data);
- Entry entry = entryContainer.getID2Entry().get(null, id, LockMode.DEFAULT);
- processEntry(entry, rootContainer.getNextEntryID(), suffix);
- migratedCount++;
- status = cursor.getNext(key, data, lockMode);
- }
- else
- {
- // This is the base entry for a branch that will be included
- // in the import so we don't want to copy the branch to the
- // new entry container.
-
- /**
- * Advance the cursor to next entry at the same level in the DIT
- * skipping all the entries in this branch. Set the next
- * starting value to a value of equal length but slightly
- * greater than the previous DN. Since keys are compared in
- * reverse order we must set the first byte (the comma). No
- * possibility of overflow here.
- */
- byte[] begin = Arrays.copyOf(key.getData(), key.getSize() + 1);
- begin[begin.length - 1] = 0x01;
- key.setData(begin);
- status = cursor.getSearchKeyRange(key, data, lockMode);
- }
- }
- flushIndexBuffers();
- }
- catch (Exception e)
- {
- logger.error(ERR_JEB_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR, e.getMessage());
- isCanceled = true;
- throw e;
- }
- finally
- {
- close(cursor);
- }
- }
- }
- return null;
- }
-
- private List<byte[]> includeBranchesAsBytes(Suffix suffix)
- {
- List<byte[]> includeBranches = new ArrayList<byte[]>(suffix.getIncludeBranches().size());
- for (DN includeBranch : suffix.getIncludeBranches())
- {
- if (includeBranch.isDescendantOf(suffix.getBaseDN()))
- {
- includeBranches.add(JebFormat.dnToDNKey(includeBranch, suffix.getBaseDN().size()));
- }
- }
- return includeBranches;
- }
-
- private boolean find(List<byte[]> arrays, byte[] arrayToFind)
- {
- for (byte[] array : arrays)
- {
- if (Arrays.equals(array, arrayToFind))
- {
- return true;
- }
- }
- return false;
- }
- }
-
- /**
- * Task to perform append/replace processing.
- */
- private class AppendReplaceTask extends ImportTask
- {
- private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
- private final Set<ByteString> deleteKeySet = new HashSet<ByteString>();
- private final EntryInformation entryInfo = new EntryInformation();
- private Entry oldEntry;
- private EntryID entryID;
-
- /** {@inheritDoc} */
- @Override
- public Void call() throws Exception
- {
- try
- {
- while (true)
- {
- if (importConfiguration.isCancelled() || isCanceled)
- {
- freeBufferQueue.add(IndexOutputBuffer.poison());
- return null;
- }
- oldEntry = null;
- Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
- if (entry == null)
- {
- break;
- }
- entryID = entryInfo.getEntryID();
- Suffix suffix = entryInfo.getSuffix();
- processEntry(entry, suffix);
- }
- flushIndexBuffers();
- }
- catch (Exception e)
- {
- logger.error(ERR_JEB_IMPORT_LDIF_APPEND_REPLACE_TASK_ERR, e.getMessage());
- isCanceled = true;
- throw e;
- }
- return null;
- }
-
- void processEntry(Entry entry, Suffix suffix)
- throws DatabaseException, DirectoryException, JebException, InterruptedException
- {
- DN entryDN = entry.getName();
- DN2ID dn2id = suffix.getDN2ID();
- EntryID oldID = dn2id.get(null, entryDN, LockMode.DEFAULT);
- if (oldID != null)
- {
- oldEntry = suffix.getID2Entry().get(null, oldID, LockMode.DEFAULT);
- }
- if (oldEntry == null)
- {
- if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
- {
- suffix.removePending(entryDN);
- return;
- }
- suffix.removePending(entryDN);
- processDN2ID(suffix, entryDN, entryID);
- }
- else
- {
- suffix.removePending(entryDN);
- entryID = oldID;
- }
- processDN2URI(suffix, oldEntry, entry);
- suffix.getID2Entry().put(null, entryID, entry);
- if (oldEntry != null)
- {
- processAllIndexes(suffix, entry, entryID);
- }
- else
- {
- processIndexes(suffix, entry, entryID);
- }
- importCount.getAndIncrement();
- }
-
- void processAllIndexes(Suffix suffix, Entry entry, EntryID entryID)
- throws DatabaseException, DirectoryException, JebException, InterruptedException
- {
- for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
- {
- AttributeType attributeType = mapEntry.getKey();
- fillIndexKey(suffix, mapEntry, entry, attributeType, entryID);
- }
- }
-
- @Override
- void processAttribute(Index index, Entry entry, EntryID entryID, IndexingOptions options,
- IndexKey indexKey) throws DatabaseException, InterruptedException
- {
- if (oldEntry != null)
- {
- deleteKeySet.clear();
- index.indexer.indexEntry(oldEntry, deleteKeySet, options);
- for (ByteString delKey : deleteKeySet)
- {
- processKey(index, delKey.toByteArray(), entryID, indexComparator, indexKey, false);
- }
- }
- insertKeySet.clear();
- index.indexer.indexEntry(entry, insertKeySet, options);
- for (ByteString key : insertKeySet)
- {
- processKey(index, key.toByteArray(), entryID, indexComparator, indexKey, true);
- }
- }
- }
-
- /**
- * This task performs phase reading and processing of the entries read from
- * the LDIF file(s). This task is used if the append flag wasn't specified.
- */
- private class ImportTask implements Callable<Void>
- {
- private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>();
- private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
- private final EntryInformation entryInfo = new EntryInformation();
- private DatabaseEntry keyEntry = new DatabaseEntry();
- private DatabaseEntry valEntry = new DatabaseEntry();
-
- /** {@inheritDoc} */
- @Override
- public Void call() throws Exception
- {
- try
- {
- while (true)
- {
- if (importConfiguration.isCancelled() || isCanceled)
- {
- freeBufferQueue.add(IndexOutputBuffer.poison());
- return null;
- }
- Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
- if (entry == null)
- {
- break;
- }
- EntryID entryID = entryInfo.getEntryID();
- Suffix suffix = entryInfo.getSuffix();
- processEntry(entry, entryID, suffix);
- }
- flushIndexBuffers();
- }
- catch (Exception e)
- {
- logger.error(ERR_JEB_IMPORT_LDIF_IMPORT_TASK_ERR, e.getMessage());
- isCanceled = true;
- throw e;
- }
- return null;
- }
-
- void processEntry(Entry entry, EntryID entryID, Suffix suffix)
- throws DatabaseException, DirectoryException, JebException, InterruptedException
- {
- DN entryDN = entry.getName();
- if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
- {
- suffix.removePending(entryDN);
- return;
- }
- suffix.removePending(entryDN);
- processDN2ID(suffix, entryDN, entryID);
- processDN2URI(suffix, null, entry);
- processIndexes(suffix, entry, entryID);
- suffix.getID2Entry().put(null, entryID, entry);
- importCount.getAndIncrement();
- }
-
- /** Examine the DN for duplicates and missing parents. */
- boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
- throws JebException, InterruptedException
- {
- //Perform parent checking.
- DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN);
- if (parentDN != null && !suffix.isParentProcessed(parentDN, tmpEnv, clearedBackend))
- {
- reader.rejectEntry(entry, ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN));
- return false;
- }
- //If the backend was not cleared, then the dn2id needs to checked first
- //for DNs that might not exist in the DN cache. If the DN is not in
- //the suffixes dn2id DB, then the dn cache is used.
- if (!clearedBackend)
- {
- EntryID id = suffix.getDN2ID().get(null, entryDN, LockMode.DEFAULT);
- if (id != null || !tmpEnv.insert(entryDN, keyEntry, valEntry))
- {
- reader.rejectEntry(entry, WARN_JEB_IMPORT_ENTRY_EXISTS.get());
- return false;
- }
- }
- else if (!tmpEnv.insert(entryDN, keyEntry, valEntry))
- {
- reader.rejectEntry(entry, WARN_JEB_IMPORT_ENTRY_EXISTS.get());
- return false;
- }
- return true;
- }
-
- void processIndexes(Suffix suffix, Entry entry, EntryID entryID)
- throws DatabaseException, DirectoryException, JebException, InterruptedException
- {
- for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
- {
- AttributeType attributeType = mapEntry.getKey();
- if (entry.hasAttribute(attributeType))
- {
- fillIndexKey(suffix, mapEntry, entry, attributeType, entryID);
- }
- }
- }
-
- void fillIndexKey(Suffix suffix, Map.Entry<AttributeType, AttributeIndex> mapEntry, Entry entry,
- AttributeType attrType, EntryID entryID)
- throws DatabaseException, InterruptedException, DirectoryException, JebException
- {
- final AttributeIndex attrIndex = mapEntry.getValue();
- final IndexingOptions options = attrIndex.getIndexingOptions();
-
- processAttribute(attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, entry, attrType, entryID, options);
- processAttribute(attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, entry, attrType, entryID, options);
- processAttribute(attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, entry, attrType, entryID, options);
- processAttribute(attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, entry, attrType, entryID, options);
- processAttribute(attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, entry, attrType, entryID, options);
-
- for (VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes())
- {
- Transaction transaction = null;
- vlvIdx.addEntry(transaction, entryID, entry);
- }
- Map<String, Collection<Index>> extensibleMap = attrIndex.getExtensibleIndexes();
- if (!extensibleMap.isEmpty())
- {
- Collection<Index> subIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING);
- processAttributes(subIndexes, ImportIndexType.EX_SUBSTRING, entry, attrType, entryID, options);
- Collection<Index> sharedIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED);
- processAttributes(sharedIndexes, ImportIndexType.EX_SHARED, entry, attrType, entryID, options);
- }
- }
-
- private void processAttribute(Index index, ImportIndexType presence, Entry entry,
- AttributeType attributeType, EntryID entryID, IndexingOptions options) throws InterruptedException
- {
- if (index != null)
- {
- IndexKey indexKey = new IndexKey(attributeType, presence, index.getIndexEntryLimit());
- processAttribute(index, entry, entryID, options, indexKey);
- }
- }
-
- private void processAttributes(Collection<Index> indexes, ImportIndexType indexType, Entry entry,
- AttributeType attributeType, EntryID entryID, IndexingOptions options) throws InterruptedException
- {
- if (indexes != null)
- {
- for (Index index : indexes)
- {
- IndexKey indexKey = new IndexKey(attributeType, indexType, index.getIndexEntryLimit());
- processAttribute(index, entry, entryID, options, indexKey);
- }
- }
- }
-
- void processAttribute(Index index, Entry entry, EntryID entryID, IndexingOptions options,
- IndexKey indexKey) throws DatabaseException, InterruptedException
- {
- insertKeySet.clear();
- index.indexer.indexEntry(entry, insertKeySet, options);
- for (ByteString key : insertKeySet)
- {
- processKey(index, key.toByteArray(), entryID, indexComparator, indexKey, true);
- }
- }
-
- void flushIndexBuffers() throws InterruptedException, ExecutionException
- {
- Set<Map.Entry<IndexKey, IndexOutputBuffer>> set = indexBufferMap.entrySet();
- Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> setIterator = set.iterator();
- while (setIterator.hasNext())
- {
- Map.Entry<IndexKey, IndexOutputBuffer> e = setIterator.next();
- IndexKey indexKey = e.getKey();
- IndexOutputBuffer indexBuffer = e.getValue();
- setIterator.remove();
- indexBuffer.setComparator(indexComparator);
- indexBuffer.setIndexKey(indexKey);
- indexBuffer.discard();
- Future<Void> future = bufferSortService.submit(new SortTask(indexBuffer));
- future.get();
- }
- }
-
- int processKey(DatabaseContainer container, byte[] key, EntryID entryID,
- IndexOutputBuffer.ComparatorBuffer<byte[]> comparator,
- IndexKey indexKey, boolean insert) throws InterruptedException
- {
- int sizeNeeded = IndexOutputBuffer.getRequiredSize(key.length, entryID.longValue());
- IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey);
- if (indexBuffer == null)
- {
- indexBuffer = getNewIndexBuffer(sizeNeeded);
- indexBufferMap.put(indexKey, indexBuffer);
- }
- else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
- {
- // complete the current buffer...
- indexBuffer.setComparator(comparator);
- indexBuffer.setIndexKey(indexKey);
- bufferSortService.submit(new SortTask(indexBuffer));
- // ... and get a new one
- indexBuffer = getNewIndexBuffer(sizeNeeded);
- indexBufferMap.put(indexKey, indexBuffer);
- }
- int id = System.identityHashCode(container);
- indexBuffer.add(key, entryID, id, insert);
- return id;
- }
-
- IndexOutputBuffer getNewIndexBuffer(int size) throws InterruptedException
- {
- IndexOutputBuffer indexBuffer;
- if (size > bufferSize)
- {
- indexBuffer = new IndexOutputBuffer(size);
- indexBuffer.discard();
- }
- else
- {
- indexBuffer = freeBufferQueue.take();
- if (indexBuffer == null)
- {
- throw new InterruptedException("Index buffer processing error.");
- }
- }
- if (indexBuffer.isPoison())
- {
- throw new InterruptedException("Cancel processing received.");
- }
- return indexBuffer;
- }
-
- void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
- throws InterruptedException
- {
- DN2ID dn2id = suffix.getDN2ID();
- byte[] dnBytes = JebFormat.dnToDNKey(dn, suffix.getBaseDN().size());
- IndexKey indexKey = new IndexKey(dnType, ImportIndexType.DN, 1);
- int id = processKey(dn2id, dnBytes, entryID, indexComparator, indexKey, true);
- idECMap.putIfAbsent(id, suffix.getEntryContainer());
- }
-
- void processDN2URI(Suffix suffix, Entry oldEntry, Entry newEntry)
- throws DatabaseException
- {
- DN2URI dn2uri = suffix.getDN2URI();
- if (oldEntry != null)
- {
- dn2uri.replaceEntry(null, oldEntry, newEntry);
- }
- else
- {
- dn2uri.addEntry(null, newEntry);
- }
- }
- }
-
- /**
- * This task reads sorted records from the temporary index scratch files,
- * processes the records and writes the results to the index database. The DN
- * index is treated differently then non-DN indexes.
- */
- private final class IndexDBWriteTask implements Callable<Void>
- {
- private final IndexManager indexMgr;
- private final DatabaseEntry dbKey, dbValue;
- private final int cacheSize;
- private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>();
- private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>();
- private final Semaphore permits;
- private final int maxPermits;
- private final AtomicLong bytesRead = new AtomicLong();
- private long lastBytesRead;
- private final AtomicInteger keyCount = new AtomicInteger();
- private RandomAccessFile bufferFile;
- private DataInputStream bufferIndexFile;
- private int remainingBuffers;
- private volatile int totalBatches;
- private AtomicInteger batchNumber = new AtomicInteger();
- private int nextBufferID;
- private int ownedPermits;
- private volatile boolean isRunning;
-
- /**
- * Creates a new index DB writer.
- *
- * @param indexMgr
- * The index manager.
- * @param permits
- * The semaphore used for restricting the number of buffer
- * allocations.
- * @param maxPermits
- * The maximum number of buffers which can be allocated.
- * @param cacheSize
- * The buffer cache size.
- */
- public IndexDBWriteTask(IndexManager indexMgr, Semaphore permits,
- int maxPermits, int cacheSize)
- {
- this.indexMgr = indexMgr;
- this.permits = permits;
- this.maxPermits = maxPermits;
- this.cacheSize = cacheSize;
-
- this.dbKey = new DatabaseEntry();
- this.dbValue = new DatabaseEntry();
- }
-
- /**
- * Initializes this task.
- *
- * @throws IOException
- * If an IO error occurred.
- */
- public void beginWriteTask() throws IOException
- {
- bufferFile = new RandomAccessFile(indexMgr.getBufferFile(), "r");
- bufferIndexFile =
- new DataInputStream(new BufferedInputStream(new FileInputStream(
- indexMgr.getBufferIndexFile())));
-
- remainingBuffers = indexMgr.getNumberOfBuffers();
- totalBatches = (remainingBuffers / maxPermits) + 1;
- batchNumber.set(0);
- nextBufferID = 0;
- ownedPermits = 0;
-
- logger.info(NOTE_JEB_IMPORT_LDIF_INDEX_STARTED, indexMgr.getBufferFileName(),
- remainingBuffers, totalBatches);
-
- indexMgr.setIndexDBWriteTask(this);
- isRunning = true;
- }
-
- /**
- * Returns the next batch of buffers to be processed, blocking until enough
- * buffer permits are available.
- *
- * @return The next batch of buffers, or {@code null} if there are no more
- * buffers to be processed.
- * @throws Exception
- * If an exception occurred.
- */
- public NavigableSet<IndexInputBuffer> getNextBufferBatch() throws Exception
- {
- // First release any previously acquired permits.
- if (ownedPermits > 0)
- {
- permits.release(ownedPermits);
- ownedPermits = 0;
- }
-
- // Block until we can either get enough permits for all buffers, or the
- // maximum number of permits.
- final int permitRequest = Math.min(remainingBuffers, maxPermits);
- if (permitRequest == 0)
- {
- // No more work to do.
- return null;
- }
- permits.acquire(permitRequest);
-
- // Update counters.
- ownedPermits = permitRequest;
- remainingBuffers -= permitRequest;
- batchNumber.incrementAndGet();
-
- // Create all the index buffers for the next batch.
- final NavigableSet<IndexInputBuffer> buffers = new TreeSet<IndexInputBuffer>();
- for (int i = 0; i < permitRequest; i++)
- {
- final long bufferBegin = bufferIndexFile.readLong();
- final long bufferEnd = bufferIndexFile.readLong();
- final IndexInputBuffer b =
- new IndexInputBuffer(indexMgr, bufferFile.getChannel(),
- bufferBegin, bufferEnd, nextBufferID++, cacheSize);
- buffers.add(b);
- }
-
- return buffers;
- }
-
- /**
- * Finishes this task.
- */
- public void endWriteTask()
- {
- isRunning = false;
-
- // First release any previously acquired permits.
- if (ownedPermits > 0)
- {
- permits.release(ownedPermits);
- ownedPermits = 0;
- }
-
- try
- {
- if (indexMgr.isDN2ID())
- {
- for (DNState dnState : dnStateMap.values())
- {
- dnState.flush();
- }
- if (!isCanceled)
- {
- logger.info(NOTE_JEB_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount());
- }
- }
- else
- {
- if (!isCanceled)
- {
- logger.info(NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE, indexMgr.getBufferFileName());
- }
- }
- }
- finally
- {
- close(bufferFile, bufferIndexFile);
-
- indexMgr.getBufferFile().delete();
- indexMgr.getBufferIndexFile().delete();
- }
- }
-
- /**
- * Print out progress stats.
- *
- * @param deltaTime
- * The time since the last update.
- */
- public void printStats(long deltaTime)
- {
- if (isRunning)
- {
- final long bufferFileSize = indexMgr.getBufferFileSize();
- final long tmpBytesRead = bytesRead.get();
- final int currentBatch = batchNumber.get();
-
- final long bytesReadInterval = tmpBytesRead - lastBytesRead;
- final int bytesReadPercent =
- Math.round((100f * tmpBytesRead) / bufferFileSize);
-
- // Kilo and milli approximately cancel out.
- final long kiloBytesRate = bytesReadInterval / deltaTime;
- final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
-
- logger.info(NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT, indexMgr.getBufferFileName(),
- bytesReadPercent, kiloBytesRemaining, kiloBytesRate, currentBatch, totalBatches);
-
- lastBytesRead = tmpBytesRead;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Void call() throws Exception, DirectoryException
- {
- ByteBuffer key = null;
- ImportIDSet insertIDSet = null;
- ImportIDSet deleteIDSet = null;
- Integer indexID = null;
-
- if (isCanceled)
- {
- return null;
- }
-
- try
- {
- beginWriteTask();
-
- NavigableSet<IndexInputBuffer> bufferSet;
- while ((bufferSet = getNextBufferBatch()) != null)
- {
- if (isCanceled)
- {
- return null;
- }
-
- while (!bufferSet.isEmpty())
- {
- IndexInputBuffer b = bufferSet.pollFirst();
- if (key == null)
- {
- indexID = b.getIndexID();
-
- if (indexMgr.isDN2ID())
- {
- insertIDSet = new ImportIDSet(1, 1, false);
- deleteIDSet = new ImportIDSet(1, 1, false);
- }
- else
- {
- Index index = idContainerMap.get(indexID);
- int limit = index.getIndexEntryLimit();
- boolean doCount = index.getMaintainCount();
- insertIDSet = new ImportIDSet(1, limit, doCount);
- deleteIDSet = new ImportIDSet(1, limit, doCount);
- }
-
- key = ByteBuffer.allocate(b.getKeyLen());
- key.flip();
- b.getKey(key);
-
- b.mergeIDSet(insertIDSet);
- b.mergeIDSet(deleteIDSet);
- insertIDSet.setKey(key);
- deleteIDSet.setKey(key);
- }
- else if (b.compare(key, indexID) != 0)
- {
- addToDB(insertIDSet, deleteIDSet, indexID);
- keyCount.incrementAndGet();
-
- indexID = b.getIndexID();
-
- if (indexMgr.isDN2ID())
- {
- insertIDSet = new ImportIDSet(1, 1, false);
- deleteIDSet = new ImportIDSet(1, 1, false);
- }
- else
- {
- Index index = idContainerMap.get(indexID);
- int limit = index.getIndexEntryLimit();
- boolean doCount = index.getMaintainCount();
- insertIDSet = new ImportIDSet(1, limit, doCount);
- deleteIDSet = new ImportIDSet(1, limit, doCount);
- }
-
- key.clear();
- if (b.getKeyLen() > key.capacity())
- {
- key = ByteBuffer.allocate(b.getKeyLen());
- }
- key.flip();
- b.getKey(key);
-
- b.mergeIDSet(insertIDSet);
- b.mergeIDSet(deleteIDSet);
- insertIDSet.setKey(key);
- deleteIDSet.setKey(key);
- }
- else
- {
- b.mergeIDSet(insertIDSet);
- b.mergeIDSet(deleteIDSet);
- }
-
- if (b.hasMoreData())
- {
- b.getNextRecord();
- bufferSet.add(b);
- }
- }
-
- if (key != null)
- {
- addToDB(insertIDSet, deleteIDSet, indexID);
- }
- }
- }
- catch (Exception e)
- {
- logger.error(ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR, indexMgr.getBufferFileName(), e.getMessage());
- throw e;
- }
- finally
- {
- endWriteTask();
- }
- return null;
- }
-
- private void addToDB(ImportIDSet insertSet, ImportIDSet deleteSet,
- int indexID) throws DirectoryException
- {
- if (!indexMgr.isDN2ID())
- {
- if (deleteSet.size() > 0 || !deleteSet.isDefined())
- {
- dbKey.setData(deleteSet.getKey().array(), 0, deleteSet.getKey().limit());
- final Index index = idContainerMap.get(indexID);
- index.delete(dbKey, deleteSet, dbValue);
- if (!indexMap.containsKey(indexID))
- {
- indexMap.put(indexID, index);
- }
- }
- if (insertSet.size() > 0 || !insertSet.isDefined())
- {
- dbKey.setData(insertSet.getKey().array(), 0, insertSet.getKey().limit());
- final Index index = idContainerMap.get(indexID);
- index.insert(dbKey, insertSet, dbValue);
- if (!indexMap.containsKey(indexID))
- {
- indexMap.put(indexID, index);
- }
- }
- }
- else
- {
- addDN2ID(insertSet, indexID);
- }
- }
-
- private void addDN2ID(ImportIDSet record, Integer indexID)
- throws DirectoryException
- {
- DNState dnState;
- if (!dnStateMap.containsKey(indexID))
- {
- dnState = new DNState(idECMap.get(indexID));
- dnStateMap.put(indexID, dnState);
- }
- else
- {
- dnState = dnStateMap.get(indexID);
- }
- if (dnState.checkParent(record))
- {
- dnState.writeToDB();
- }
- }
-
- private void addBytesRead(int bytesRead)
- {
- this.bytesRead.addAndGet(bytesRead);
- }
-
- /**
- * This class is used to by a index DB merge thread performing DN processing
- * to keep track of the state of individual DN2ID index processing.
- */
- class DNState
- {
- private static final int DN_STATE_CACHE_SIZE = 64 * KB;
-
- private ByteBuffer parentDN, lastDN;
- private EntryID parentID, lastID, entryID;
- private final DatabaseEntry dnKey, dnValue;
- private final TreeMap<ByteBuffer, EntryID> parentIDMap;
- private final EntryContainer entryContainer;
- private final Map<byte[], ImportIDSet> id2childTree;
- private final Map<byte[], ImportIDSet> id2subtreeTree;
- private final int childLimit, subTreeLimit;
- private final boolean childDoCount, subTreeDoCount;
-
- DNState(EntryContainer entryContainer)
- {
- this.entryContainer = entryContainer;
- parentIDMap = new TreeMap<ByteBuffer, EntryID>();
- Comparator<byte[]> childComparator =
- entryContainer.getID2Children().getComparator();
- id2childTree = new TreeMap<byte[], ImportIDSet>(childComparator);
- childLimit = entryContainer.getID2Children().getIndexEntryLimit();
- childDoCount = entryContainer.getID2Children().getMaintainCount();
- Comparator<byte[]> subComparator =
- entryContainer.getID2Subtree().getComparator();
- subTreeLimit = entryContainer.getID2Subtree().getIndexEntryLimit();
- subTreeDoCount = entryContainer.getID2Subtree().getMaintainCount();
- id2subtreeTree = new TreeMap<byte[], ImportIDSet>(subComparator);
- dnKey = new DatabaseEntry();
- dnValue = new DatabaseEntry();
- lastDN = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
- }
-
- private ByteBuffer getParent(ByteBuffer buffer)
- {
- int parentIndex = JebFormat.findDNKeyParent(buffer.array(), 0, buffer.limit());
- if (parentIndex < 0)
- {
- // This is the root or base DN
- return null;
- }
- ByteBuffer parent = buffer.duplicate();
- parent.limit(parentIndex);
- return parent;
- }
-
- private ByteBuffer deepCopy(ByteBuffer srcBuffer, ByteBuffer destBuffer)
- {
- if (destBuffer == null
- || destBuffer.clear().remaining() < srcBuffer.limit())
- {
- byte[] bytes = new byte[srcBuffer.limit()];
- System.arraycopy(srcBuffer.array(), 0, bytes, 0, srcBuffer.limit());
- return ByteBuffer.wrap(bytes);
- }
- else
- {
- destBuffer.put(srcBuffer);
- destBuffer.flip();
- return destBuffer;
- }
- }
-
- /** Why do we still need this if we are checking parents in the first phase? */
- private boolean checkParent(ImportIDSet record) throws DatabaseException
- {
- dnKey.setData(record.getKey().array(), 0, record.getKey().limit());
- byte[] v = record.toDatabase();
- long v1 = JebFormat.entryIDFromDatabase(v);
- dnValue.setData(v);
-
- entryID = new EntryID(v1);
- parentDN = getParent(record.getKey());
-
- //Bypass the cache for append data, lookup the parent in DN2ID and return.
- if (importConfiguration != null
- && importConfiguration.appendToExistingData())
- {
- //If null is returned than this is a suffix DN.
- if (parentDN != null)
- {
- DatabaseEntry key = new DatabaseEntry(parentDN.array(), 0, parentDN.limit());
- DatabaseEntry value = new DatabaseEntry();
- OperationStatus status = entryContainer.getDN2ID().read(null, key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- parentID = new EntryID(value);
- }
- else
- {
- // We have a missing parent. Maybe parent checking was turned off?
- // Just ignore.
- parentID = null;
- return false;
- }
- }
- }
- else if (parentIDMap.isEmpty())
- {
- parentIDMap.put(deepCopy(record.getKey(), null), entryID);
- return true;
- }
- else if (lastDN != null && lastDN.equals(parentDN))
- {
- parentIDMap.put(deepCopy(lastDN, null), lastID);
- parentID = lastID;
- lastDN = deepCopy(record.getKey(), lastDN);
- lastID = entryID;
- return true;
- }
- else if (parentIDMap.lastKey().equals(parentDN))
- {
- parentID = parentIDMap.get(parentDN);
- lastDN = deepCopy(record.getKey(), lastDN);
- lastID = entryID;
- return true;
- }
- else if (parentIDMap.containsKey(parentDN))
- {
- EntryID newParentID = parentIDMap.get(parentDN);
- ByteBuffer key = parentIDMap.lastKey();
- while (!parentDN.equals(key))
- {
- parentIDMap.remove(key);
- key = parentIDMap.lastKey();
- }
- parentIDMap.put(deepCopy(record.getKey(), null), entryID);
- parentID = newParentID;
- lastDN = deepCopy(record.getKey(), lastDN);
- lastID = entryID;
- }
- else
- {
- // We have a missing parent. Maybe parent checking was turned off?
- // Just ignore.
- parentID = null;
- return false;
- }
- return true;
- }
-
- private void id2child(EntryID childID) throws DirectoryException
- {
- ImportIDSet idSet;
- if (parentID != null)
- {
- if (!id2childTree.containsKey(parentID.getDatabaseEntry().getData()))
- {
- idSet = new ImportIDSet(1, childLimit, childDoCount);
- id2childTree.put(parentID.getDatabaseEntry().getData(), idSet);
- }
- else
- {
- idSet = id2childTree.get(parentID.getDatabaseEntry().getData());
- }
- idSet.addEntryID(childID);
- if (id2childTree.size() > DN_STATE_CACHE_SIZE)
- {
- flushMapToDB(id2childTree, entryContainer.getID2Children(), true);
- }
- }
- else
- {
- throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION,
- ERR_PARENT_ENTRY_IS_MISSING.get());
- }
- }
-
- private EntryID getParentID(ByteBuffer dn) throws DatabaseException
- {
- // Bypass the cache for append data, lookup the parent DN in the DN2ID db
- if (importConfiguration == null || !importConfiguration.appendToExistingData())
- {
- return parentIDMap.get(dn);
- }
- DatabaseEntry key = new DatabaseEntry(dn.array(), 0, dn.limit());
- DatabaseEntry value = new DatabaseEntry();
- OperationStatus status = entryContainer.getDN2ID().read(null, key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- return new EntryID(value);
- }
- return null;
- }
-
- private void id2SubTree(EntryID childID) throws DirectoryException
- {
- if (parentID != null)
- {
- ImportIDSet idSet;
- if (!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData()))
- {
- idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount);
- id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet);
- }
- else
- {
- idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData());
- }
- idSet.addEntryID(childID);
- // TODO:
- // Instead of doing this,
- // we can just walk to parent cache if available
- for (ByteBuffer dn = getParent(parentDN); dn != null; dn = getParent(dn))
- {
- EntryID nodeID = getParentID(dn);
- if (nodeID == null)
- {
- // We have a missing parent. Maybe parent checking was turned off?
- // Just ignore.
- break;
- }
- if (!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData()))
- {
- idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount);
- id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet);
- }
- else
- {
- idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData());
- }
- idSet.addEntryID(childID);
- }
- if (id2subtreeTree.size() > DN_STATE_CACHE_SIZE)
- {
- flushMapToDB(id2subtreeTree, entryContainer.getID2Subtree(), true);
- }
- }
- else
- {
- throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION,
- ERR_PARENT_ENTRY_IS_MISSING.get());
- }
- }
-
- public void writeToDB() throws DirectoryException
- {
- entryContainer.getDN2ID().put(null, dnKey, dnValue);
- indexMgr.addTotDNCount(1);
- if (parentDN != null)
- {
- id2child(entryID);
- id2SubTree(entryID);
- }
- }
-
- private void flushMapToDB(Map<byte[], ImportIDSet> map, Index index,
- boolean clearMap)
- {
- for (Map.Entry<byte[], ImportIDSet> e : map.entrySet())
- {
- byte[] key = e.getKey();
- ImportIDSet idSet = e.getValue();
- dnKey.setData(key);
- index.insert(dnKey, idSet, dnValue);
- }
- if (clearMap)
- {
- map.clear();
- }
- }
-
- public void flush()
- {
- flushMapToDB(id2childTree, entryContainer.getID2Children(), false);
- flushMapToDB(id2subtreeTree, entryContainer.getID2Subtree(), false);
- }
- }
- }
-
- /**
- * This task writes the temporary scratch index files using the sorted buffers
- * read from a blocking queue private to each index.
- */
- private final class ScratchFileWriterTask implements Callable<Void>
- {
- private final int DRAIN_TO = 3;
- private final IndexManager indexMgr;
- private final BlockingQueue<IndexOutputBuffer> queue;
- private final ByteArrayOutputStream insertByteStream = new ByteArrayOutputStream(2 * bufferSize);
- private final ByteArrayOutputStream deleteByteStream = new ByteArrayOutputStream(2 * bufferSize);
- private final DataOutputStream bufferStream;
- private final DataOutputStream bufferIndexStream;
- private final byte[] tmpArray = new byte[8];
- private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<IndexOutputBuffer>();
- private int insertKeyCount, deleteKeyCount;
- private int bufferCount;
- private boolean poisonSeen;
-
- public ScratchFileWriterTask(BlockingQueue<IndexOutputBuffer> queue,
- IndexManager indexMgr) throws FileNotFoundException
- {
- this.queue = queue;
- this.indexMgr = indexMgr;
- this.bufferStream = newDataOutputStream(indexMgr.getBufferFile());
- this.bufferIndexStream = newDataOutputStream(indexMgr.getBufferIndexFile());
- }
-
- private DataOutputStream newDataOutputStream(File file) throws FileNotFoundException
- {
- return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), READER_WRITER_BUFFER_SIZE));
- }
-
- /** {@inheritDoc} */
- @Override
- public Void call() throws IOException, InterruptedException
- {
- long offset = 0;
- List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>();
- try
- {
- while (true)
- {
- final IndexOutputBuffer indexBuffer = queue.take();
- long beginOffset = offset;
- long bufferLen;
- if (!queue.isEmpty())
- {
- queue.drainTo(l, DRAIN_TO);
- l.add(indexBuffer);
- bufferLen = writeIndexBuffers(l);
- for (IndexOutputBuffer id : l)
- {
- if (!id.isDiscarded())
- {
- id.reset();
- freeBufferQueue.add(id);
- }
- }
- l.clear();
- }
- else
- {
- if (indexBuffer.isPoison())
- {
- break;
- }
- bufferLen = writeIndexBuffer(indexBuffer);
- if (!indexBuffer.isDiscarded())
- {
- indexBuffer.reset();
- freeBufferQueue.add(indexBuffer);
- }
- }
- offset += bufferLen;
-
- // Write buffer index information.
- bufferIndexStream.writeLong(beginOffset);
- bufferIndexStream.writeLong(offset);
-
- bufferCount++;
- Importer.this.bufferCount.incrementAndGet();
-
- if (poisonSeen)
- {
- break;
- }
- }
- }
- catch (IOException e)
- {
- logger.error(ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR,
- indexMgr.getBufferFile().getAbsolutePath(), e.getMessage());
- isCanceled = true;
- throw e;
- }
- finally
- {
- close(bufferStream, bufferIndexStream);
- indexMgr.setBufferInfo(bufferCount, indexMgr.getBufferFile().length());
- }
- return null;
- }
-
- private long writeIndexBuffer(IndexOutputBuffer indexBuffer) throws IOException
- {
- indexBuffer.setPosition(-1);
- resetStreams();
-
- long bufferLen = 0;
- final int numberKeys = indexBuffer.getNumberKeys();
- for (int i = 0; i < numberKeys; i++)
- {
- if (indexBuffer.getPosition() == -1)
- {
- indexBuffer.setPosition(i);
- insertOrDeleteKey(indexBuffer, i);
- continue;
- }
- if (!indexBuffer.compare(i))
- {
- bufferLen += writeRecord(indexBuffer);
- indexBuffer.setPosition(i);
- resetStreams();
- }
- insertOrDeleteKeyCheckEntryLimit(indexBuffer, i);
- }
- if (indexBuffer.getPosition() != -1)
- {
- bufferLen += writeRecord(indexBuffer);
- }
- return bufferLen;
- }
-
- private long writeIndexBuffers(List<IndexOutputBuffer> buffers) throws IOException
- {
- resetStreams();
-
- long id = 0;
- long bufferLen = 0;
- for (IndexOutputBuffer b : buffers)
- {
- if (b.isPoison())
- {
- poisonSeen = true;
- }
- else
- {
- b.setPosition(0);
- b.setID(id++);
- indexSortedSet.add(b);
- }
- }
- byte[] saveKey = null;
- int saveIndexID = 0;
- while (!indexSortedSet.isEmpty())
- {
- final IndexOutputBuffer b = indexSortedSet.pollFirst();
- if (saveKey == null)
- {
- saveKey = b.getKey();
- saveIndexID = b.getIndexID();
- insertOrDeleteKey(b, b.getPosition());
- }
- else if (!b.compare(saveKey, saveIndexID))
- {
- bufferLen += writeRecord(saveKey, saveIndexID);
- resetStreams();
- saveKey = b.getKey();
- saveIndexID = b.getIndexID();
- insertOrDeleteKey(b, b.getPosition());
- }
- else
- {
- insertOrDeleteKeyCheckEntryLimit(b, b.getPosition());
- }
- if (b.hasMoreData())
- {
- b.nextRecord();
- indexSortedSet.add(b);
- }
- }
- if (saveKey != null)
- {
- bufferLen += writeRecord(saveKey, saveIndexID);
- }
- return bufferLen;
- }
-
- private void resetStreams()
- {
- insertByteStream.reset();
- insertKeyCount = 0;
- deleteByteStream.reset();
- deleteKeyCount = 0;
- }
-
- private void insertOrDeleteKey(IndexOutputBuffer indexBuffer, int i)
- {
- if (indexBuffer.isInsertRecord(i))
- {
- indexBuffer.writeID(insertByteStream, i);
- insertKeyCount++;
- }
- else
- {
- indexBuffer.writeID(deleteByteStream, i);
- deleteKeyCount++;
- }
- }
-
- private void insertOrDeleteKeyCheckEntryLimit(IndexOutputBuffer indexBuffer, int i)
- {
- if (indexBuffer.isInsertRecord(i))
- {
- if (insertKeyCount++ <= indexMgr.getLimit())
- {
- indexBuffer.writeID(insertByteStream, i);
- }
- }
- else
- {
- indexBuffer.writeID(deleteByteStream, i);
- deleteKeyCount++;
- }
- }
-
- private int writeByteStreams() throws IOException
- {
- if (insertKeyCount > indexMgr.getLimit())
- {
- insertKeyCount = 1;
- insertByteStream.reset();
- PackedInteger.writeInt(tmpArray, 0, -1);
- insertByteStream.write(tmpArray, 0, 1);
- }
- int insertSize = PackedInteger.getWriteIntLength(insertKeyCount);
- PackedInteger.writeInt(tmpArray, 0, insertKeyCount);
- bufferStream.write(tmpArray, 0, insertSize);
- if (insertByteStream.size() > 0)
- {
- insertByteStream.writeTo(bufferStream);
- }
- int deleteSize = PackedInteger.getWriteIntLength(deleteKeyCount);
- PackedInteger.writeInt(tmpArray, 0, deleteKeyCount);
- bufferStream.write(tmpArray, 0, deleteSize);
- if (deleteByteStream.size() > 0)
- {
- deleteByteStream.writeTo(bufferStream);
- }
- return insertSize + deleteSize;
- }
-
- private int writeHeader(int indexID, int keySize) throws IOException
- {
- bufferStream.writeInt(indexID);
- int packedSize = PackedInteger.getWriteIntLength(keySize);
- PackedInteger.writeInt(tmpArray, 0, keySize);
- bufferStream.write(tmpArray, 0, packedSize);
- return packedSize;
- }
-
- private int writeRecord(IndexOutputBuffer b) throws IOException
- {
- int keySize = b.getKeySize();
- int packedSize = writeHeader(b.getIndexID(), keySize);
- b.writeKey(bufferStream);
- packedSize += writeByteStreams();
- return packedSize + keySize + insertByteStream.size() + deleteByteStream.size() + 4;
- }
-
- private int writeRecord(byte[] k, int indexID) throws IOException
- {
- int packedSize = writeHeader(indexID, k.length);
- bufferStream.write(k);
- packedSize += writeByteStreams();
- return packedSize + k.length + insertByteStream.size() + deleteByteStream.size() + 4;
- }
- }
-
- /**
- * This task main function is to sort the index buffers given to it from the
- * import tasks reading the LDIF file. It will also create a index file writer
- * task and corresponding queue if needed. The sorted index buffers are put on
- * the index file writer queues for writing to a temporary file.
- */
- private final class SortTask implements Callable<Void>
- {
-
- private final IndexOutputBuffer indexBuffer;
-
- public SortTask(IndexOutputBuffer indexBuffer)
- {
- this.indexBuffer = indexBuffer;
- }
-
- /** {@inheritDoc} */
- @Override
- public Void call() throws Exception
- {
- if ((importConfiguration != null && importConfiguration.isCancelled())
- || isCanceled)
- {
- isCanceled = true;
- return null;
- }
- indexBuffer.sort();
- if (!indexKeyQueueMap.containsKey(indexBuffer.getIndexKey()))
- {
- createIndexWriterTask(indexBuffer.getIndexKey());
- }
- indexKeyQueueMap.get(indexBuffer.getIndexKey()).add(indexBuffer);
- return null;
- }
-
- private void createIndexWriterTask(IndexKey indexKey) throws FileNotFoundException
- {
- synchronized (synObj)
- {
- if (indexKeyQueueMap.containsKey(indexKey))
- {
- return;
- }
- boolean isDN2ID = ImportIndexType.DN.equals(indexKey.getIndexType());
- IndexManager indexMgr = new IndexManager(indexKey.getName(), isDN2ID, indexKey.getEntryLimit());
- if (isDN2ID)
- {
- DNIndexMgrList.add(indexMgr);
- }
- else
- {
- indexMgrList.add(indexMgr);
- }
- BlockingQueue<IndexOutputBuffer> newQueue =
- new ArrayBlockingQueue<IndexOutputBuffer>(phaseOneBufferCount);
- ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQueue, indexMgr);
- scratchFileWriterList.add(indexWriter);
- scratchFileWriterFutures.add(scratchFileWriterService.submit(indexWriter));
- indexKeyQueueMap.put(indexKey, newQueue);
- }
- }
- }
-
- /**
- * The index manager class has several functions:
- * <ol>
- * <li>It is used to carry information about index processing created in phase one to phase two</li>
- * <li>It collects statistics about phase two processing for each index</li>
- * <li>It manages opening and closing the scratch index files</li>
- * </ol>
- */
- final class IndexManager implements Comparable<IndexManager>
- {
- private final File bufferFile;
- private final String bufferFileName;
- private final File bufferIndexFile;
- private final boolean isDN2ID;
- private final int limit;
-
- private int numberOfBuffers;
- private long bufferFileSize;
- private long totalDNs;
- private volatile IndexDBWriteTask writer;
-
- private IndexManager(String fileName, boolean isDN2ID, int limit)
- {
- this.bufferFileName = fileName;
- this.bufferFile = new File(tempDir, bufferFileName);
- this.bufferIndexFile = new File(tempDir, bufferFileName + ".index");
-
- this.isDN2ID = isDN2ID;
- this.limit = limit > 0 ? limit : Integer.MAX_VALUE;
- }
-
- private void setIndexDBWriteTask(IndexDBWriteTask writer)
- {
- this.writer = writer;
- }
-
- private File getBufferFile()
- {
- return bufferFile;
- }
-
- private long getBufferFileSize()
- {
- return bufferFileSize;
- }
-
- private File getBufferIndexFile()
- {
- return bufferIndexFile;
- }
-
- private void setBufferInfo(int numberOfBuffers, long bufferFileSize)
- {
- this.numberOfBuffers = numberOfBuffers;
- this.bufferFileSize = bufferFileSize;
- }
-
- /**
- * Updates the bytes read counter.
- *
- * @param bytesRead
- * The number of bytes read.
- */
- void addBytesRead(int bytesRead)
- {
- if (writer != null)
- {
- writer.addBytesRead(bytesRead);
- }
- }
-
- private void addTotDNCount(int delta)
- {
- totalDNs += delta;
- }
-
- private long getDNCount()
- {
- return totalDNs;
- }
-
- private boolean isDN2ID()
- {
- return isDN2ID;
- }
-
- private void printStats(long deltaTime)
- {
- if (writer != null)
- {
- writer.printStats(deltaTime);
- }
- }
-
- /**
- * Returns the file name associated with this index manager.
- *
- * @return The file name associated with this index manager.
- */
- String getBufferFileName()
- {
- return bufferFileName;
- }
-
- private int getLimit()
- {
- return limit;
- }
-
- /** {@inheritDoc} */
- @Override
- public int compareTo(IndexManager mgr)
- {
- return numberOfBuffers - mgr.numberOfBuffers;
- }
-
- private int getNumberOfBuffers()
- {
- return numberOfBuffers;
- }
- }
-
- /**
- * The rebuild index manager handles all rebuild index related processing.
- */
- private class RebuildIndexManager extends ImportTask implements
- DiskSpaceMonitorHandler
- {
-
- /** Rebuild index configuration. */
- private final RebuildConfig rebuildConfig;
-
- /** Local DB backend configuration. */
- private final LocalDBBackendCfg cfg;
-
- /** Map of index keys to indexes. */
- private final Map<IndexKey, Index> indexMap =
- new LinkedHashMap<IndexKey, Index>();
-
- /** Map of index keys to extensible indexes. */
- private final Map<IndexKey, Collection<Index>> extensibleIndexMap =
- new LinkedHashMap<IndexKey, Collection<Index>>();
-
- /** List of VLV indexes. */
- private final List<VLVIndex> vlvIndexes = new LinkedList<VLVIndex>();
-
- /** The DN2ID index. */
- private DN2ID dn2id;
-
- /** The DN2URI index. */
- private DN2URI dn2uri;
-
- /** Total entries to be processed. */
- private long totalEntries;
-
- /** Total entries processed. */
- private final AtomicLong entriesProcessed = new AtomicLong(0);
-
- /** The suffix instance. */
- private Suffix suffix;
-
- /** The entry container. */
- private EntryContainer entryContainer;
-
- /**
- * Create an instance of the rebuild index manager using the specified
- * parameters.
- *
- * @param rebuildConfig
- * The rebuild configuration to use.
- * @param cfg
- * The local DB configuration to use.
- */
- public RebuildIndexManager(RebuildConfig rebuildConfig,
- LocalDBBackendCfg cfg)
- {
- this.rebuildConfig = rebuildConfig;
- this.cfg = cfg;
- }
-
- /**
- * Initialize a rebuild index manager.
- *
- * @throws ConfigException
- * If an configuration error occurred.
- * @throws InitializationException
- * If an initialization error occurred.
- */
- public void initialize() throws ConfigException, InitializationException
- {
- entryContainer = rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
- suffix = new Suffix(entryContainer, null, null, null);
- if (suffix == null)
- {
- throw new InitializationException(
- ERR_JEB_REBUILD_SUFFIX_ERROR.get(rebuildConfig.getBaseDN()));
- }
- }
-
- /**
- * Print start message.
- *
- * @throws DatabaseException
- * If an database error occurred.
- */
- public void printStartMessage() throws DatabaseException
- {
- totalEntries = suffix.getID2Entry().getRecordCount();
-
- switch (rebuildConfig.getRebuildMode())
- {
- case ALL:
- logger.info(NOTE_JEB_REBUILD_ALL_START, totalEntries);
- break;
- case DEGRADED:
- logger.info(NOTE_JEB_REBUILD_DEGRADED_START, totalEntries);
- break;
- default:
- if (!rebuildConfig.isClearDegradedState()
- && logger.isInfoEnabled())
- {
- String indexes = Utils.joinAsString(", ", rebuildConfig.getRebuildList());
- logger.info(NOTE_JEB_REBUILD_START, indexes, totalEntries);
- }
- break;
- }
- }
-
- /**
- * Print stop message.
- *
- * @param startTime
- * The time the rebuild started.
- */
- public void printStopMessage(long startTime)
- {
- long finishTime = System.currentTimeMillis();
- long totalTime = finishTime - startTime;
- float rate = 0;
- if (totalTime > 0)
- {
- rate = 1000f * entriesProcessed.get() / totalTime;
- }
-
- if (!rebuildConfig.isClearDegradedState())
- {
- logger.info(NOTE_JEB_REBUILD_FINAL_STATUS, entriesProcessed.get(), totalTime / 1000, rate);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Void call() throws Exception
- {
- ID2Entry id2entry = entryContainer.getID2Entry();
- DiskOrderedCursor cursor =
- id2entry.openCursor(DiskOrderedCursorConfig.DEFAULT);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- try
- {
- while (cursor.getNext(key, data, null) == OperationStatus.SUCCESS)
- {
- if (isCanceled)
- {
- return null;
- }
- EntryID entryID = new EntryID(key);
- Entry entry =
- ID2Entry.entryFromDatabase(ByteString.wrap(data.getData()),
- entryContainer.getRootContainer().getCompressedSchema());
- processEntry(entry, entryID);
- entriesProcessed.getAndIncrement();
- }
- flushIndexBuffers();
- }
- catch (Exception e)
- {
- logger.traceException(e);
- logger.error(ERR_JEB_IMPORT_LDIF_REBUILD_INDEX_TASK_ERR, stackTraceToSingleLineString(e));
- isCanceled = true;
- throw e;
- }
- finally
- {
- close(cursor);
- }
- return null;
- }
-
- /**
- * Perform rebuild index processing.
- *
- * @throws DatabaseException
- * If an database error occurred.
- * @throws InterruptedException
- * If an interrupted error occurred.
- * @throws ExecutionException
- * If an Execution error occurred.
- * @throws JebException
- * If an JEB error occurred.
- */
- public void rebuildIndexes() throws DatabaseException,
- InterruptedException, ExecutionException, JebException
- {
- // Sets only the needed indexes.
- setIndexesListsToBeRebuilt();
-
- if (!rebuildConfig.isClearDegradedState())
- {
- // If not in a 'clear degraded state' operation,
- // need to rebuild the indexes.
- setRebuildListIndexesTrusted(false);
- clearIndexes(true);
- phaseOne();
- if (isCanceled)
- {
- throw new InterruptedException("Rebuild Index canceled.");
- }
- phaseTwo();
- }
- else
- {
- logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
- }
-
- setRebuildListIndexesTrusted(true);
- }
-
- @SuppressWarnings("fallthrough")
- private void setIndexesListsToBeRebuilt() throws JebException
- {
- // Depends on rebuild mode, (re)building indexes' lists.
- final RebuildMode mode = rebuildConfig.getRebuildMode();
- switch (mode)
- {
- case ALL:
- rebuildIndexMap(false);
- // falls through
- case DEGRADED:
- if (mode == RebuildMode.ALL
- || !entryContainer.getID2Children().isTrusted()
- || !entryContainer.getID2Subtree().isTrusted())
- {
- dn2id = entryContainer.getDN2ID();
- }
- if (mode == RebuildMode.ALL || entryContainer.getDN2URI() == null)
- {
- dn2uri = entryContainer.getDN2URI();
- }
- if (mode == RebuildMode.DEGRADED
- || entryContainer.getAttributeIndexes().isEmpty())
- {
- rebuildIndexMap(true); // only degraded.
- }
- if (mode == RebuildMode.ALL || vlvIndexes.isEmpty())
- {
- vlvIndexes.addAll(new LinkedList<VLVIndex>(entryContainer.getVLVIndexes()));
- }
- break;
-
- case USER_DEFINED:
- // false may be required if the user wants to rebuild specific index.
- rebuildIndexMap(false);
- break;
- default:
- break;
- }
- }
-
- private void rebuildIndexMap(final boolean onlyDegraded)
- {
- // rebuildList contains the user-selected index(in USER_DEFINED mode).
- final List<String> rebuildList = rebuildConfig.getRebuildList();
- for (final Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
- {
- final AttributeType attributeType = mapEntry.getKey();
- final AttributeIndex attributeIndex = mapEntry.getValue();
- if (rebuildConfig.getRebuildMode() == RebuildMode.ALL
- || rebuildConfig.getRebuildMode() == RebuildMode.DEGRADED)
- {
- // Get all existing indexes for all && degraded mode.
- rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
- }
- else if (!rebuildList.isEmpty())
- {
- // Get indexes for user defined index.
- for (final String index : rebuildList)
- {
- if (attributeType.getNameOrOID().toLowerCase().equals(index.toLowerCase()))
- {
- rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
- }
- }
- }
- }
- }
-
- private void rebuildAttributeIndexes(final AttributeIndex attrIndex,
- final AttributeType attrType, final boolean onlyDegraded)
- throws DatabaseException
- {
- fillIndexMap(attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
-
- final Map<String, Collection<Index>> extensibleMap = attrIndex.getExtensibleIndexes();
- if (!extensibleMap.isEmpty())
- {
- final Collection<Index> subIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING);
- fillIndexMap(attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
- final Collection<Index> sharedIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED);
- fillIndexMap(attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
- }
- }
-
- private void fillIndexMap(final AttributeType attrType, final Collection<Index> indexes,
- final ImportIndexType importIndexType, final boolean onlyDegraded)
- {
- if (indexes != null && !indexes.isEmpty())
- {
- final List<Index> mutableCopy = new LinkedList<Index>(indexes);
- for (final Iterator<Index> it = mutableCopy.iterator(); it.hasNext();)
- {
- final Index sharedIndex = it.next();
- if (!onlyDegraded || !sharedIndex.isTrusted())
- {
- if (!rebuildConfig.isClearDegradedState() || sharedIndex.getRecordCount() == 0)
- {
- putInIdContainerMap(sharedIndex);
- }
- }
- else
- {
- // This index is not a candidate for rebuilding.
- it.remove();
- }
- }
- if (!mutableCopy.isEmpty())
- {
- extensibleIndexMap.put(new IndexKey(attrType, importIndexType, 0), mutableCopy);
- }
- }
- }
-
- private void fillIndexMap(final AttributeType attrType, final Index partialAttrIndex,
- final ImportIndexType importIndexType, final boolean onlyDegraded)
- {
- if (partialAttrIndex != null
- && (!onlyDegraded || !partialAttrIndex.isTrusted())
- && (!rebuildConfig.isClearDegradedState() || partialAttrIndex.getRecordCount() == 0))
- {
- putInIdContainerMap(partialAttrIndex);
- final IndexKey indexKey = new IndexKey(attrType, importIndexType, partialAttrIndex.getIndexEntryLimit());
- indexMap.put(indexKey, partialAttrIndex);
- }
- }
-
- private void clearIndexes(boolean onlyDegraded) throws DatabaseException
- {
- // Clears all the entry's container databases which are containing the indexes
- if (!onlyDegraded)
- {
- // dn2uri does not have a trusted status.
- entryContainer.clearDatabase(entryContainer.getDN2URI());
- }
-
- if (!onlyDegraded
- || !entryContainer.getID2Children().isTrusted()
- || !entryContainer.getID2Subtree().isTrusted())
- {
- entryContainer.clearDatabase(entryContainer.getDN2ID());
- entryContainer.clearDatabase(entryContainer.getID2Children());
- entryContainer.clearDatabase(entryContainer.getID2Subtree());
- }
-
- if (!indexMap.isEmpty())
- {
- for (final Map.Entry<IndexKey, Index> mapEntry : indexMap.entrySet())
- {
- if (!onlyDegraded || !mapEntry.getValue().isTrusted())
- {
- entryContainer.clearDatabase(mapEntry.getValue());
- }
- }
- }
-
- if (!extensibleIndexMap.isEmpty())
- {
- for (final Collection<Index> subIndexes : extensibleIndexMap.values())
- {
- if (subIndexes != null)
- {
- for (final Index subIndex : subIndexes)
- {
- entryContainer.clearDatabase(subIndex);
- }
- }
- }
- }
-
- for (final VLVIndex vlvIndex : entryContainer.getVLVIndexes())
- {
- if (!onlyDegraded || !vlvIndex.isTrusted())
- {
- entryContainer.clearDatabase(vlvIndex);
- }
- }
- }
-
- private void setRebuildListIndexesTrusted(boolean trusted)
- throws JebException
- {
- try
- {
- if (dn2id != null)
- {
- EntryContainer ec = suffix.getEntryContainer();
- ec.getID2Children().setTrusted(null, trusted);
- ec.getID2Subtree().setTrusted(null, trusted);
- }
- if (!indexMap.isEmpty())
- {
- for (Map.Entry<IndexKey, Index> mapEntry : indexMap.entrySet())
- {
- Index index = mapEntry.getValue();
- index.setTrusted(null, trusted);
- }
- }
- if (!vlvIndexes.isEmpty())
- {
- for (VLVIndex vlvIndex : vlvIndexes)
- {
- vlvIndex.setTrusted(null, trusted);
- }
- }
- if (!extensibleIndexMap.isEmpty())
- {
- for (Collection<Index> subIndexes : extensibleIndexMap.values())
- {
- if (subIndexes != null)
- {
- for (Index subIndex : subIndexes)
- {
- subIndex.setTrusted(null, trusted);
- }
- }
- }
- }
- }
- catch (DatabaseException ex)
- {
- throw new JebException(NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()));
- }
- }
-
- private void phaseOne() throws DatabaseException, InterruptedException,
- ExecutionException
- {
- initializeIndexBuffers();
- Timer timer = scheduleAtFixedRate(new RebuildFirstPhaseProgressTask());
- scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
- bufferSortService = Executors.newFixedThreadPool(threadCount);
- ExecutorService rebuildIndexService = Executors.newFixedThreadPool(threadCount);
- List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
- for (int i = 0; i < threadCount; i++)
- {
- tasks.add(this);
- }
- List<Future<Void>> results = rebuildIndexService.invokeAll(tasks);
- getAll(results);
- stopScratchFileWriters();
- getAll(scratchFileWriterFutures);
-
- // Try to clear as much memory as possible.
- shutdownAll(rebuildIndexService, bufferSortService, scratchFileWriterService);
- timer.cancel();
-
- clearAll(tasks, results, scratchFileWriterList, scratchFileWriterFutures, freeBufferQueue);
- indexKeyQueueMap.clear();
- }
-
- private void phaseTwo() throws InterruptedException, ExecutionException
- {
- final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
- try
- {
- processIndexFiles();
- }
- finally
- {
- timer.cancel();
- }
- }
-
- private Timer scheduleAtFixedRate(TimerTask task)
- {
- final Timer timer = new Timer();
- timer.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL);
- return timer;
- }
-
- private int getIndexCount() throws ConfigException, JebException,
- InitializationException
- {
- switch (rebuildConfig.getRebuildMode())
- {
- case ALL:
- return getTotalIndexCount(cfg);
- case DEGRADED:
- // FIXME: since the environment is not started we cannot determine which
- // indexes are degraded. As a workaround, be conservative and assume all
- // indexes need rebuilding.
- return getTotalIndexCount(cfg);
- default:
- return getRebuildListIndexCount(cfg);
- }
- }
-
- private int getRebuildListIndexCount(LocalDBBackendCfg cfg)
- throws JebException, ConfigException, InitializationException
- {
- final List<String> rebuildList = rebuildConfig.getRebuildList();
- if (rebuildList.isEmpty())
- {
- return 0;
- }
-
- int indexCount = 0;
- for (String index : rebuildList)
- {
- final String lowerName = index.toLowerCase();
- if ("dn2id".equals(lowerName))
- {
- indexCount += 3;
- }
- else if ("dn2uri".equals(lowerName))
- {
- indexCount++;
- }
- else if (lowerName.startsWith("vlv."))
- {
- if (lowerName.length() < 5)
- {
- throw new JebException(ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName));
- }
- indexCount++;
- }
- else if ("id2subtree".equals(lowerName)
- || "id2children".equals(lowerName))
- {
- throw attributeIndexNotConfigured(index);
- }
- else
- {
- final String[] attrIndexParts = lowerName.split("\\.");
- if (attrIndexParts.length <= 0 || attrIndexParts.length > 3)
- {
- throw attributeIndexNotConfigured(index);
- }
- AttributeType attrType = DirectoryServer.getAttributeType(attrIndexParts[0]);
- if (attrType == null)
- {
- throw attributeIndexNotConfigured(index);
- }
- if (attrIndexParts.length != 1)
- {
- final String indexType = attrIndexParts[1];
- if (attrIndexParts.length == 2)
- {
- if ("presence".equals(indexType)
- || "equality".equals(indexType)
- || "ordering".equals(indexType)
- || "substring".equals(indexType)
- || "approximate".equals(indexType))
- {
- indexCount++;
- }
- else
- {
- throw attributeIndexNotConfigured(index);
- }
- }
- else // attrIndexParts.length == 3
- {
- if (!findExtensibleMatchingRule(cfg, indexType + "." + attrIndexParts[2]))
- {
- throw attributeIndexNotConfigured(index);
- }
- indexCount++;
- }
- }
- else
- {
- boolean found = false;
- for (final String idx : cfg.listLocalDBIndexes())
- {
- if (idx.equalsIgnoreCase(index))
- {
- found = true;
- final LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
- indexCount += getAttributeIndexCount(indexCfg.getIndexType(),
- PRESENCE, EQUALITY, ORDERING, SUBSTRING, APPROXIMATE);
- indexCount += getExtensibleIndexCount(indexCfg);
- }
- }
- if (!found)
- {
- throw attributeIndexNotConfigured(index);
- }
- }
- }
- }
- return indexCount;
- }
-
- private InitializationException attributeIndexNotConfigured(String index)
- {
- return new InitializationException(ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index));
- }
-
- private boolean findExtensibleMatchingRule(LocalDBBackendCfg cfg, String indexExRuleName) throws ConfigException
- {
- for (String idx : cfg.listLocalDBIndexes())
- {
- LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
- if (indexCfg.getIndexType().contains(EXTENSIBLE))
- {
- for (String exRule : indexCfg.getIndexExtensibleMatchingRule())
- {
- if (exRule.equalsIgnoreCase(indexExRuleName))
- {
- return true;
- }
- }
- }
- }
- return false;
- }
-
- private int getAttributeIndexCount(SortedSet<IndexType> indexTypes, IndexType... toFinds)
- {
- int result = 0;
- for (IndexType toFind : toFinds)
- {
- if (indexTypes.contains(toFind))
- {
- result++;
- }
- }
- return result;
- }
-
- private int getExtensibleIndexCount(LocalDBIndexCfg indexCfg)
- {
- int result = 0;
- if (indexCfg.getIndexType().contains(EXTENSIBLE))
- {
- boolean shared = false;
- for (final String exRule : indexCfg.getIndexExtensibleMatchingRule())
- {
- if (exRule.endsWith(".sub"))
- {
- result++;
- }
- else if (!shared)
- {
- shared = true;
- result++;
- }
- }
- }
- return result;
- }
-
- private void processEntry(Entry entry, EntryID entryID)
- throws DatabaseException, DirectoryException, JebException,
- InterruptedException
- {
- if (dn2id != null)
- {
- processDN2ID(suffix, entry.getName(), entryID);
- }
- if (dn2uri != null)
- {
- processDN2URI(suffix, null, entry);
- }
- processIndexes(entry, entryID);
- processExtensibleIndexes(entry, entryID);
- processVLVIndexes(entry, entryID);
- }
-
- private void processVLVIndexes(Entry entry, EntryID entryID)
- throws DatabaseException, JebException, DirectoryException
- {
- for (VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes())
- {
- Transaction transaction = null;
- vlvIdx.addEntry(transaction, entryID, entry);
- }
- }
-
- private void processExtensibleIndexes(Entry entry, EntryID entryID)
- throws InterruptedException
- {
- for (Map.Entry<IndexKey, Collection<Index>> mapEntry :
- this.extensibleIndexMap.entrySet())
- {
- IndexKey key = mapEntry.getKey();
- AttributeType attrType = key.getAttributeType();
- if (entry.hasAttribute(attrType))
- {
- AttributeIndex attributeIndex = entryContainer.getAttributeIndex(attrType);
- IndexingOptions options = attributeIndex.getIndexingOptions();
- for (Index index : mapEntry.getValue())
- {
- processAttribute(index, entry, entryID, options, key);
- }
- }
- }
- }
-
- private void processIndexes(Entry entry, EntryID entryID)
- throws DatabaseException, InterruptedException
- {
- for (Map.Entry<IndexKey, Index> mapEntry : indexMap.entrySet())
- {
- IndexKey key = mapEntry.getKey();
- AttributeType attrType = key.getAttributeType();
- if (entry.hasAttribute(attrType))
- {
- AttributeIndex attributeIndex = entryContainer.getAttributeIndex(attrType);
- IndexingOptions options = attributeIndex.getIndexingOptions();
- Index index = mapEntry.getValue();
- processAttribute(index, entry, entryID, options,
- new IndexKey(attrType, key.getIndexType(), index.getIndexEntryLimit()));
- }
- }
- }
-
- /**
- * Return the number of entries processed by the rebuild manager.
- *
- * @return The number of entries processed.
- */
- public long getEntriesProcess()
- {
- return this.entriesProcessed.get();
- }
-
- /**
- * Return the total number of entries to process by the rebuild manager.
- *
- * @return The total number for entries to process.
- */
- public long getTotalEntries()
- {
- return this.totalEntries;
- }
-
- @Override
- public void diskLowThresholdReached(DiskSpaceMonitor monitor)
- {
- diskFullThresholdReached(monitor);
- }
-
- @Override
- public void diskFullThresholdReached(DiskSpaceMonitor monitor)
- {
- isCanceled = true;
- logger.error(ERR_REBUILD_INDEX_LACK_DISK, monitor.getDirectory().getPath(),
- monitor.getFreeSpace(), monitor.getLowThreshold());
- }
-
- @Override
- public void diskSpaceRestored(DiskSpaceMonitor monitor)
- {
- // Do nothing
- }
- }
-
- /**
- * This class reports progress of rebuild index processing at fixed intervals.
- */
- private class RebuildFirstPhaseProgressTask extends TimerTask
- {
- /**
- * The number of records that had been processed at the time of the previous
- * progress report.
- */
- private long previousProcessed;
- /** The time in milliseconds of the previous progress report. */
- private long previousTime;
- /** The environment statistics at the time of the previous report. */
- private EnvironmentStats prevEnvStats;
-
- /**
- * Create a new rebuild index progress task.
- *
- * @throws DatabaseException
- * If an error occurred while accessing the JE database.
- */
- public RebuildFirstPhaseProgressTask() throws DatabaseException
- {
- previousTime = System.currentTimeMillis();
- prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
- }
-
- /**
- * The action to be performed by this timer task.
- */
- @Override
- public void run()
- {
- long latestTime = System.currentTimeMillis();
- long deltaTime = latestTime - previousTime;
-
- if (deltaTime == 0)
- {
- return;
- }
- long entriesProcessed = rebuildManager.getEntriesProcess();
- long deltaCount = entriesProcessed - previousProcessed;
- float rate = 1000f * deltaCount / deltaTime;
- float completed = 0;
- if (rebuildManager.getTotalEntries() > 0)
- {
- completed = 100f * entriesProcessed / rebuildManager.getTotalEntries();
- }
- logger.info(NOTE_JEB_REBUILD_PROGRESS_REPORT, completed, entriesProcessed,
- rebuildManager.getTotalEntries(), rate);
- try
- {
- Runtime runtime = Runtime.getRuntime();
- long freeMemory = runtime.freeMemory() / MB;
- EnvironmentStats envStats =
- rootContainer.getEnvironmentStats(new StatsConfig());
- long nCacheMiss = envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
-
- float cacheMissRate = 0;
- if (deltaCount > 0)
- {
- cacheMissRate = nCacheMiss / (float) deltaCount;
- }
- logger.info(NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT, freeMemory, cacheMissRate);
- prevEnvStats = envStats;
- }
- catch (DatabaseException e)
- {
- // Unlikely to happen and not critical.
- }
- previousProcessed = entriesProcessed;
- previousTime = latestTime;
- }
- }
-
- /**
- * This class reports progress of first phase of import processing at fixed
- * intervals.
- */
- private final class FirstPhaseProgressTask extends TimerTask
- {
- /**
- * The number of entries that had been read at the time of the previous
- * progress report.
- */
- private long previousCount;
- /** The time in milliseconds of the previous progress report. */
- private long previousTime;
- /** The environment statistics at the time of the previous report. */
- private EnvironmentStats previousStats;
- /** Determines if eviction has been detected. */
- private boolean evicting;
- /** Entry count when eviction was detected. */
- private long evictionEntryCount;
-
- /** Create a new import progress task. */
- public FirstPhaseProgressTask()
- {
- previousTime = System.currentTimeMillis();
- try
- {
- previousStats = rootContainer.getEnvironmentStats(new StatsConfig());
- }
- catch (DatabaseException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /** The action to be performed by this timer task. */
- @Override
- public void run()
- {
- long latestCount = reader.getEntriesRead() + 0;
- long deltaCount = latestCount - previousCount;
- long latestTime = System.currentTimeMillis();
- long deltaTime = latestTime - previousTime;
- if (deltaTime == 0)
- {
- return;
- }
- long entriesRead = reader.getEntriesRead();
- long entriesIgnored = reader.getEntriesIgnored();
- long entriesRejected = reader.getEntriesRejected();
- float rate = 1000f * deltaCount / deltaTime;
- logger.info(NOTE_JEB_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
- try
- {
- Runtime runTime = Runtime.getRuntime();
- long freeMemory = runTime.freeMemory() / MB;
- EnvironmentStats environmentStats;
-
- //If first phase skip DN validation is specified use the root container
- //stats, else use the temporary environment stats.
- if (skipDNValidation)
- {
- environmentStats =
- rootContainer.getEnvironmentStats(new StatsConfig());
- }
- else
- {
- environmentStats = tmpEnv.getEnvironmentStats(new StatsConfig());
- }
- long nCacheMiss =
- environmentStats.getNCacheMiss() - previousStats.getNCacheMiss();
-
- float cacheMissRate = 0;
- if (deltaCount > 0)
- {
- cacheMissRate = nCacheMiss / (float) deltaCount;
- }
- logger.info(NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT, freeMemory, cacheMissRate);
- long evictPasses = environmentStats.getNEvictPasses();
- long evictNodes = environmentStats.getNNodesExplicitlyEvicted();
- long evictBinsStrip = environmentStats.getNBINsStripped();
- long cleanerRuns = environmentStats.getNCleanerRuns();
- long cleanerDeletions = environmentStats.getNCleanerDeletions();
- long cleanerEntriesRead = environmentStats.getNCleanerEntriesRead();
- long cleanerINCleaned = environmentStats.getNINsCleaned();
- long checkPoints = environmentStats.getNCheckpoints();
- if (evictPasses != 0)
- {
- if (!evicting)
- {
- evicting = true;
- evictionEntryCount = reader.getEntriesRead();
- logger.info(NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED, evictionEntryCount);
- }
- logger.info(NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS, evictPasses,
- evictNodes, evictBinsStrip);
- }
- if (cleanerRuns != 0)
- {
- logger.info(NOTE_JEB_IMPORT_LDIF_CLEANER_STATS, cleanerRuns,
- cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
- }
- if (checkPoints > 1)
- {
- logger.info(NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS, checkPoints);
- }
- previousStats = environmentStats;
- }
- catch (DatabaseException e)
- {
- // Unlikely to happen and not critical.
- }
- previousCount = latestCount;
- previousTime = latestTime;
- }
- }
-
- /**
- * This class reports progress of the second phase of import processing at
- * fixed intervals.
- */
- private class SecondPhaseProgressTask extends TimerTask
- {
- /**
- * The number of entries that had been read at the time of the previous
- * progress report.
- */
- private long previousCount;
- /** The time in milliseconds of the previous progress report. */
- private long previousTime;
- /** The environment statistics at the time of the previous report. */
- private EnvironmentStats previousStats;
- /** Determines if eviction has been detected. */
- private boolean evicting;
- private long latestCount;
-
- /**
- * Create a new import progress task.
- *
- * @param latestCount
- * The latest count of entries processed in phase one.
- */
- public SecondPhaseProgressTask(long latestCount)
- {
- previousTime = System.currentTimeMillis();
- this.latestCount = latestCount;
- try
- {
- previousStats = rootContainer.getEnvironmentStats(new StatsConfig());
- }
- catch (DatabaseException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /** The action to be performed by this timer task. */
- @Override
- public void run()
- {
- long deltaCount = latestCount - previousCount;
- long latestTime = System.currentTimeMillis();
- long deltaTime = latestTime - previousTime;
- if (deltaTime == 0)
- {
- return;
- }
- try
- {
- Runtime runTime = Runtime.getRuntime();
- long freeMemory = runTime.freeMemory() / MB;
- EnvironmentStats environmentStats =
- rootContainer.getEnvironmentStats(new StatsConfig());
- long nCacheMiss =
- environmentStats.getNCacheMiss() - previousStats.getNCacheMiss();
-
- float cacheMissRate = 0;
- if (deltaCount > 0)
- {
- cacheMissRate = nCacheMiss / (float) deltaCount;
- }
- logger.info(NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT, freeMemory, cacheMissRate);
- long evictPasses = environmentStats.getNEvictPasses();
- long evictNodes = environmentStats.getNNodesExplicitlyEvicted();
- long evictBinsStrip = environmentStats.getNBINsStripped();
- long cleanerRuns = environmentStats.getNCleanerRuns();
- long cleanerDeletions = environmentStats.getNCleanerDeletions();
- long cleanerEntriesRead = environmentStats.getNCleanerEntriesRead();
- long cleanerINCleaned = environmentStats.getNINsCleaned();
- long checkPoints = environmentStats.getNCheckpoints();
- if (evictPasses != 0)
- {
- if (!evicting)
- {
- evicting = true;
- }
- logger.info(NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS, evictPasses,
- evictNodes, evictBinsStrip);
- }
- if (cleanerRuns != 0)
- {
- logger.info(NOTE_JEB_IMPORT_LDIF_CLEANER_STATS, cleanerRuns,
- cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
- }
- if (checkPoints > 1)
- {
- logger.info(NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS, checkPoints);
- }
- previousStats = environmentStats;
- }
- catch (DatabaseException e)
- {
- // Unlikely to happen and not critical.
- }
- previousCount = latestCount;
- previousTime = latestTime;
-
- //Do DN index managers first.
- for (IndexManager indexMgrDN : DNIndexMgrList)
- {
- indexMgrDN.printStats(deltaTime);
- }
- //Do non-DN index managers.
- for (IndexManager indexMgr : indexMgrList)
- {
- indexMgr.printStats(deltaTime);
- }
- }
- }
-
- /**
- * A class to hold information about the entry determined by the LDIF reader.
- * Mainly the suffix the entry belongs under and the ID assigned to it by the
- * reader.
- */
- public class EntryInformation
- {
- private EntryID entryID;
- private Suffix suffix;
-
- /**
- * Return the suffix associated with the entry.
- *
- * @return Entry's suffix instance;
- */
- public Suffix getSuffix()
- {
- return suffix;
- }
-
- /**
- * Set the suffix instance associated with the entry.
- *
- * @param suffix
- * The suffix associated with the entry.
- */
- public void setSuffix(Suffix suffix)
- {
- this.suffix = suffix;
- }
-
- /**
- * Set the entry's ID.
- *
- * @param entryID
- * The entry ID to set the entry ID to.
- */
- public void setEntryID(EntryID entryID)
- {
- this.entryID = entryID;
- }
-
- /**
- * Return the entry ID associated with the entry.
- *
- * @return The entry ID associated with the entry.
- */
- public EntryID getEntryID()
- {
- return entryID;
- }
- }
-
- /**
- * This class defines the individual index type available.
- */
- public enum ImportIndexType
- {
- /** The DN index type. */
- DN,
- /** The equality index type. */
- EQUALITY,
- /** The presence index type. */
- PRESENCE,
- /** The sub-string index type. */
- SUBSTRING,
- /** The ordering index type. */
- ORDERING,
- /** The approximate index type. */
- APPROXIMATE,
- /** The extensible sub-string index type. */
- EX_SUBSTRING,
- /** The extensible shared index type. */
- EX_SHARED,
- /** The vlv index type. */
- VLV
- }
-
- /**
- * This class is used as an index key for hash maps that need to process
- * multiple suffix index elements into a single queue and/or maps based on
- * both attribute type and index type (ie., cn.equality, sn.equality,...).
- */
- public class IndexKey
- {
-
- private final AttributeType attributeType;
- private final ImportIndexType indexType;
- private final int entryLimit;
-
- /**
- * Create index key instance using the specified attribute type, index type
- * and index entry limit.
- *
- * @param attributeType
- * The attribute type.
- * @param indexType
- * The index type.
- * @param entryLimit
- * The entry limit for the index.
- */
- IndexKey(AttributeType attributeType, ImportIndexType indexType,
- int entryLimit)
- {
- this.attributeType = attributeType;
- this.indexType = indexType;
- this.entryLimit = entryLimit;
- }
-
- /**
- * An equals method that uses both the attribute type and the index type.
- * Only returns {@code true} if the attribute type and index type are equal.
- *
- * @param obj
- * the object to compare.
- * @return {@code true} if the objects are equal, or {@code false} if they
- * are not.
- */
- @Override
- public boolean equals(Object obj)
- {
- if (obj instanceof IndexKey)
- {
- IndexKey oKey = (IndexKey) obj;
- if (attributeType.equals(oKey.getAttributeType())
- && indexType.equals(oKey.getIndexType()))
- {
- return true;
- }
- }
- return false;
- }
-
- /**
- * A hash code method that adds the hash codes of the attribute type and
- * index type and returns that value.
- *
- * @return The combined hash values of attribute type hash code and the
- * index type hash code.
- */
- @Override
- public int hashCode()
- {
- return attributeType.hashCode() + indexType.hashCode();
- }
-
- /**
- * Return the attribute type.
- *
- * @return The attribute type.
- */
- public AttributeType getAttributeType()
- {
- return attributeType;
- }
-
- /**
- * Return the index type.
- *
- * @return The index type.
- */
- public ImportIndexType getIndexType()
- {
- return indexType;
- }
-
- /**
- * Return the index key name, which is the attribute type primary name, a
- * period, and the index type name. Used for building file names and
- * progress output.
- *
- * @return The index key name.
- */
- public String getName()
- {
- return attributeType.getPrimaryName() + "."
- + StaticUtils.toLowerCase(indexType.name());
- }
-
- /**
- * Return the entry limit associated with the index.
- *
- * @return The entry limit.
- */
- public int getEntryLimit()
- {
- return entryLimit;
- }
- }
-
- /**
- * The temporary environment will be shared when multiple suffixes are being
- * processed. This interface is used by those suffix instance to do parental
- * checking of the DN cache.
- */
- public static interface DNCache
- {
-
- /**
- * Returns {@code true} if the specified DN is contained in the DN cache, or
- * {@code false} otherwise.
- *
- * @param dn
- * The DN to check the presence of.
- * @return {@code true} if the cache contains the DN, or {@code false} if it
- * is not.
- * @throws DatabaseException
- * If an error occurs reading the database.
- */
- boolean contains(DN dn) throws DatabaseException;
- }
-
- /**
- * Temporary environment used to check DN's when DN validation is performed
- * during phase one processing. It is deleted after phase one processing.
- */
- public final class TmpEnv implements DNCache
- {
- private String envPath;
- private Environment environment;
- private static final String DB_NAME = "dn_cache";
- private Database dnCache;
-
- /**
- * Create a temporary DB environment and database to be used as a cache of
- * DNs when DN validation is performed in phase one processing.
- *
- * @param envPath
- * The file path to create the environment under.
- * @throws DatabaseException
- * If an error occurs either creating the environment or the DN
- * database.
- */
- public TmpEnv(File envPath) throws DatabaseException
- {
- EnvironmentConfig envConfig = new EnvironmentConfig();
- envConfig.setConfigParam(ENV_RUN_CLEANER, "true");
- envConfig.setReadOnly(false);
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(false);
- envConfig.setConfigParam(ENV_IS_LOCKING, "true");
- envConfig.setConfigParam(ENV_RUN_CHECKPOINTER, "false");
- envConfig.setConfigParam(EVICTOR_LRU_ONLY, "false");
- envConfig.setConfigParam(EVICTOR_NODES_PER_SCAN, "128");
- envConfig.setConfigParam(MAX_MEMORY, Long.toString(tmpEnvCacheSize));
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setAllowCreate(true);
- dbConfig.setTransactional(false);
- dbConfig.setTemporary(true);
- environment = new Environment(envPath, envConfig);
- dnCache = environment.openDatabase(null, DB_NAME, dbConfig);
- this.envPath = envPath.getPath();
- }
-
- private static final long FNV_INIT = 0xcbf29ce484222325L;
- private static final long FNV_PRIME = 0x100000001b3L;
-
- /** Hash the DN bytes. Uses the FNV-1a hash. */
- private byte[] hashCode(byte[] b)
- {
- long hash = FNV_INIT;
- for (byte aB : b)
- {
- hash ^= aB;
- hash *= FNV_PRIME;
- }
- return JebFormat.entryIDToDatabase(hash);
- }
-
- /**
- * Shutdown the temporary environment.
- *
- * @throws JebException
- * If error occurs.
- */
- public void shutdown() throws JebException
- {
- dnCache.close();
- environment.close();
- EnvManager.removeFiles(envPath);
- }
-
- /**
- * Insert the specified DN into the DN cache. It will return {@code true} if
- * the DN does not already exist in the cache and was inserted, or
- * {@code false} if the DN exists already in the cache.
- *
- * @param dn
- * The DN to insert in the cache.
- * @param val
- * A database entry to use in the insert.
- * @param key
- * A database entry to use in the insert.
- * @return {@code true} if the DN was inserted in the cache, or
- * {@code false} if the DN exists in the cache already and could not
- * be inserted.
- * @throws JebException
- * If an error occurs accessing the database.
- */
- public boolean insert(DN dn, DatabaseEntry val, DatabaseEntry key)
- throws JebException
- {
- // Use a compact representation for key
- byte[] dnBytesForKey = dn.toIrreversibleNormalizedByteString().toByteArray();
- key.setData(hashCode(dnBytesForKey));
-
- // Use a reversible representation for value
- byte[] dnBytesForValue = StaticUtils.getBytes(dn.toString());
- int len = PackedInteger.getWriteIntLength(dnBytesForValue.length);
- byte[] dataBytes = new byte[dnBytesForValue.length + len];
- int pos = PackedInteger.writeInt(dataBytes, 0, dnBytesForValue.length);
- System.arraycopy(dnBytesForValue, 0, dataBytes, pos, dnBytesForValue.length);
- val.setData(dataBytes);
-
- return insert(key, val, dnBytesForValue);
- }
-
- private boolean insert(DatabaseEntry key, DatabaseEntry val, byte[] dnBytesForValue)
- throws JebException
- {
- Cursor cursor = null;
- try
- {
- cursor = dnCache.openCursor(null, CursorConfig.DEFAULT);
- OperationStatus status = cursor.putNoOverwrite(key, val);
- if (status == OperationStatus.KEYEXIST)
- {
- DatabaseEntry dns = new DatabaseEntry();
- status = cursor.getSearchKey(key, dns, LockMode.RMW);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new JebException(LocalizableMessage.raw("Search DN cache failed."));
- }
- if (!isDNMatched(dns.getData(), dnBytesForValue))
- {
- addDN(dns.getData(), cursor, dnBytesForValue);
- return true;
- }
- return false;
- }
- return true;
- }
- finally
- {
- close(cursor);
- }
- }
-
- /** Add the DN to the DNs as because of a hash collision. */
- private void addDN(byte[] readDnBytes, Cursor cursor, byte[] dnBytesForValue) throws JebException
- {
- int pLen = PackedInteger.getWriteIntLength(dnBytesForValue.length);
- int totLen = readDnBytes.length + pLen + dnBytesForValue.length;
- byte[] newRec = new byte[totLen];
- System.arraycopy(readDnBytes, 0, newRec, 0, readDnBytes.length);
- int pos = PackedInteger.writeInt(newRec, readDnBytes.length, dnBytesForValue.length);
- System.arraycopy(dnBytesForValue, 0, newRec, pos, dnBytesForValue.length);
- DatabaseEntry newVal = new DatabaseEntry(newRec);
- OperationStatus status = cursor.putCurrent(newVal);
- if (status != OperationStatus.SUCCESS)
- {
- throw new JebException(LocalizableMessage.raw("Add of DN to DN cache failed."));
- }
- }
-
- /** Return true if the specified DN is in the DNs saved as a result of hash collisions. */
- private boolean isDNMatched(byte[] readDnBytes, byte[] dnBytes)
- {
- int pos = 0;
- while (pos < readDnBytes.length)
- {
- int pLen = PackedInteger.getReadIntLength(readDnBytes, pos);
- int len = PackedInteger.readInt(readDnBytes, pos);
- if (indexComparator.compare(readDnBytes, pos + pLen, len, dnBytes, dnBytes.length) == 0)
- {
- return true;
- }
- pos += pLen + len;
- }
- return false;
- }
-
- /**
- * Check if the specified DN is contained in the temporary DN cache.
- *
- * @param dn
- * A DN check for.
- * @return {@code true} if the specified DN is in the temporary DN cache, or
- * {@code false} if it is not.
- */
- @Override
- public boolean contains(DN dn)
- {
- Cursor cursor = null;
- DatabaseEntry key = new DatabaseEntry();
- byte[] dnBytesForKey = dn.toIrreversibleNormalizedByteString().toByteArray();
- key.setData(hashCode(dnBytesForKey));
- try
- {
- cursor = dnCache.openCursor(null, CursorConfig.DEFAULT);
- DatabaseEntry dns = new DatabaseEntry();
- OperationStatus status = cursor.getSearchKey(key, dns, LockMode.DEFAULT);
- byte[] dnBytesForValue = StaticUtils.getBytes(dn.toString());
- return status == OperationStatus.SUCCESS && isDNMatched(dns.getData(), dnBytesForValue);
- }
- finally
- {
- close(cursor);
- }
- }
-
- /**
- * Return temporary environment stats.
- *
- * @param statsConfig
- * A stats configuration instance.
- * @return Environment stats.
- * @throws DatabaseException
- * If an error occurs retrieving the stats.
- */
- public EnvironmentStats getEnvironmentStats(StatsConfig statsConfig)
- throws DatabaseException
- {
- return environment.getStats(statsConfig);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void diskLowThresholdReached(DiskSpaceMonitor monitor)
- {
- diskFullThresholdReached(monitor);
- }
-
- /** {@inheritDoc} */
- @Override
- public void diskFullThresholdReached(DiskSpaceMonitor monitor)
- {
- isCanceled = true;
- Arg3<Object, Number, Number> argMsg = !isPhaseOneDone
- ? ERR_IMPORT_LDIF_LACK_DISK_PHASE_ONE
- : ERR_IMPORT_LDIF_LACK_DISK_PHASE_TWO;
- logger.error(argMsg.get(monitor.getDirectory().getPath(), monitor.getFreeSpace(), monitor.getLowThreshold()));
- }
-
- /** {@inheritDoc} */
- @Override
- public void diskSpaceRestored(DiskSpaceMonitor monitor)
- {
- // Do nothing.
- }
-}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexInputBuffer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexInputBuffer.java
deleted file mode 100644
index d1e9ac6..0000000
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexInputBuffer.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2010 Sun Microsystems, Inc.
- * Portions Copyright 2012-2014 ForgeRock AS.
- */
-
-package org.opends.server.backends.jeb.importLDIF;
-
-
-
-import static org.opends.messages.JebMessages.ERR_JEB_IMPORT_BUFFER_IO_ERROR;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import org.opends.server.backends.jeb.importLDIF.Importer.IndexManager;
-
-import com.sleepycat.util.PackedInteger;
-
-
-
-/**
- * The buffer class is used to process a buffer from the temporary index files
- * during phase 2 processing.
- */
-public final class IndexInputBuffer implements Comparable<IndexInputBuffer>
-{
-
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- private final IndexManager indexMgr;
- private final FileChannel channel;
- private final long begin;
- private final long end;
- private final int id;
-
- private long offset;
- private final ByteBuffer cache;
- private Integer indexID = null;
- private ByteBuffer keyBuf = ByteBuffer.allocate(128);
-
-
-
- /**
- * Possible states while reading a record.
- */
- private enum RecordState
- {
- START, NEED_INSERT_ID_SET, NEED_DELETE_ID_SET
- }
-
-
-
- private RecordState recordState = RecordState.START;
-
-
-
- /**
- * Creates a new index input buffer.
- *
- * @param indexMgr
- * The index manager.
- * @param channel
- * The index file channel.
- * @param begin
- * The position of the start of the buffer in the scratch file.
- * @param end
- * The position of the end of the buffer in the scratch file.
- * @param id
- * The index ID.
- * @param cacheSize
- * The cache size.
- * @throws IOException
- * If an IO error occurred when priming the cache.
- */
- public IndexInputBuffer(IndexManager indexMgr, FileChannel channel,
- long begin, long end, int id, int cacheSize) throws IOException
- {
- this.indexMgr = indexMgr;
- this.channel = channel;
- this.begin = begin;
- this.end = end;
- this.offset = 0;
- this.id = id;
- this.cache = ByteBuffer.allocate(Math.max(cacheSize - 384, 256));
-
- loadCache();
- cache.flip();
- keyBuf.flip();
- }
-
-
-
- private void loadCache() throws IOException
- {
- channel.position(begin + offset);
- long leftToRead = end - (begin + offset);
- long bytesToRead;
- if (leftToRead < cache.remaining())
- {
- cache.limit((int) (cache.position() + leftToRead));
- bytesToRead = (int) leftToRead;
- }
- else
- {
- bytesToRead = Math.min((end - offset), cache.remaining());
- }
- int bytesRead = 0;
- while (bytesRead < bytesToRead)
- {
- bytesRead += channel.read(cache);
- }
- offset += bytesRead;
- indexMgr.addBytesRead(bytesRead);
- }
-
-
-
- /**
- * Returns {@code true} if this buffer has more data.
- *
- * @return {@code true} if this buffer has more data.
- * @throws IOException
- * If an IO error occurred.
- */
- public boolean hasMoreData() throws IOException
- {
- boolean ret = ((begin + offset) >= end);
- return !(cache.remaining() == 0 && ret);
- }
-
-
-
- /**
- * Returns the length of the next key.
- *
- * @return The length of the next key.
- */
- public int getKeyLen()
- {
- return keyBuf.limit();
- }
-
-
-
- /**
- * Returns the next key.
- *
- * @param b
- * A buffer into which the key should be added.
- */
- public void getKey(ByteBuffer b)
- {
- keyBuf.get(b.array(), 0, keyBuf.limit());
- b.limit(keyBuf.limit());
- }
-
-
-
- /**
- * Returns the index ID of the next record.
- *
- * @return The index ID of the next record.
- */
- public Integer getIndexID()
- {
- if (indexID == null)
- {
- try
- {
- getNextRecord();
- }
- catch (IOException ex)
- {
- logger.error(ERR_JEB_IMPORT_BUFFER_IO_ERROR, indexMgr
- .getBufferFileName());
- throw new RuntimeException(ex);
- }
- }
- return indexID;
- }
-
-
-
- /**
- * Reads the next record from the buffer, skipping any remaining data in the
- * current record.
- *
- * @throws IOException
- * If an IO error occurred.
- */
- public void getNextRecord() throws IOException
- {
- switch (recordState)
- {
- case START:
- // Nothing to skip.
- break;
- case NEED_INSERT_ID_SET:
- // The previous record's ID sets were not read, so skip them both.
- mergeIDSet(null);
- mergeIDSet(null);
- break;
- case NEED_DELETE_ID_SET:
- // The previous record's delete ID set was not read, so skip it.
- mergeIDSet(null);
- break;
- }
-
- indexID = getInt();
-
- ensureData(20);
- byte[] ba = cache.array();
- int p = cache.position();
- int len = PackedInteger.getReadIntLength(ba, p);
- int keyLen = PackedInteger.readInt(ba, p);
- cache.position(p + len);
- if (keyLen > keyBuf.capacity())
- {
- keyBuf = ByteBuffer.allocate(keyLen);
- }
- ensureData(keyLen);
- keyBuf.clear();
- cache.get(keyBuf.array(), 0, keyLen);
- keyBuf.limit(keyLen);
-
- recordState = RecordState.NEED_INSERT_ID_SET;
- }
-
-
-
- private int getInt() throws IOException
- {
- ensureData(4);
- return cache.getInt();
- }
-
-
-
- /**
- * Reads the next ID set from the record and merges it with the provided ID
- * set.
- *
- * @param idSet
- * The ID set to be merged.
- * @throws IOException
- * If an IO error occurred.
- */
- public void mergeIDSet(ImportIDSet idSet) throws IOException
- {
- if (recordState == RecordState.START)
- {
- throw new IllegalStateException();
- }
-
- ensureData(20);
- int p = cache.position();
- byte[] ba = cache.array();
- int len = PackedInteger.getReadIntLength(ba, p);
- int keyCount = PackedInteger.readInt(ba, p);
- p += len;
- cache.position(p);
- for (int k = 0; k < keyCount; k++)
- {
- if (ensureData(9))
- {
- p = cache.position();
- }
- len = PackedInteger.getReadLongLength(ba, p);
- long l = PackedInteger.readLong(ba, p);
- p += len;
- cache.position(p);
-
- // idSet will be null if skipping.
- if (idSet != null)
- {
- idSet.addEntryID(l);
- }
- }
-
- switch (recordState)
- {
- case START:
- throw new IllegalStateException();
- case NEED_INSERT_ID_SET:
- recordState = RecordState.NEED_DELETE_ID_SET;
- break;
- case NEED_DELETE_ID_SET:
- recordState = RecordState.START;
- break;
- }
- }
-
-
-
- private boolean ensureData(int len) throws IOException
- {
- boolean ret = false;
- if (cache.remaining() == 0)
- {
- cache.clear();
- loadCache();
- cache.flip();
- ret = true;
- }
- else if (cache.remaining() < len)
- {
- cache.compact();
- loadCache();
- cache.flip();
- ret = true;
- }
- return ret;
- }
-
-
-
- /**
- * Compares this buffer with the provided key and index ID.
- *
- * @param cKey
- * The key.
- * @param cIndexID
- * The index ID.
- * @return A negative number if this buffer is less than the provided key and
- * index ID, a positive number if this buffer is greater, or zero if
- * it is the same.
- */
- int compare(ByteBuffer cKey, Integer cIndexID)
- {
- int returnCode, rc;
- if (keyBuf.limit() == 0)
- {
- getIndexID();
- }
- rc = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
- cKey.array(), cKey.limit());
- if (rc != 0)
- {
- returnCode = 1;
- }
- else
- {
- returnCode = (indexID.intValue() == cIndexID.intValue()) ? 0 : 1;
- }
- return returnCode;
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public int compareTo(IndexInputBuffer o)
- {
- // used in remove.
- if (this == o)
- {
- return 0;
- }
-
- if (keyBuf.limit() == 0)
- {
- getIndexID();
- }
-
- if (o.keyBuf.limit() == 0)
- {
- o.getIndexID();
- }
-
- byte[] oKey = o.keyBuf.array();
- int oLen = o.keyBuf.limit();
- int returnCode = Importer.indexComparator.compare(keyBuf.array(), 0,
- keyBuf.limit(), oKey, oLen);
- if (returnCode == 0)
- {
- returnCode = indexID.intValue() - o.getIndexID().intValue();
- if (returnCode == 0)
- {
- returnCode = id - o.id;
- }
- }
- return returnCode;
- }
-}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
deleted file mode 100644
index e5fdcd8..0000000
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
+++ /dev/null
@@ -1,1001 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013-2015 ForgeRock AS.
- */
-package org.opends.server.backends.jeb.importLDIF;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.opends.server.backends.jeb.EntryID;
-
-import com.sleepycat.util.PackedInteger;
-
-/**
- * This class represents a index buffer used to store the keys and entry IDs
- * processed from the LDIF file during phase one of an import, or rebuild index
- * process. Each key and ID is stored in a record in the buffer.
- * <p>
- * The records in the buffer are eventually sorted, based on the key, when the
- * maximum size of the buffer is reached and no more records will fit into the
- * buffer. The buffer is scheduled to be flushed to an index scratch file and
- * then re-cycled by the import, or rebuild-index process.
- * </p>
- * <p>
- * The structure of a record in the buffer is the following:
- *
- * <pre>
- * +-------------+-------------+---------+---------+------------+-----------+
- * | record size | INS/DEL bit | indexID | entryID | key length | key bytes |
- * +-------------+-------------+---------+---------+------------+-----------+
- * </pre>
- *
- * The record size is used for fast seeks to quickly "jump" over records.
- * </p>
- * <p>
- * The records are packed as much as possible, to optimize the buffer space.<br>
- * This class is not thread safe.
- * </p>
- */
-public final class IndexOutputBuffer implements Comparable<IndexOutputBuffer> {
-
- /** Enumeration used when sorting a buffer. */
- private enum CompareOp {
- LT, GT, LE, GE, EQ
- }
-
- /** The size of a Java int. A Java int is 32 bits, i.e. 4 bytes. */
- private static final int INT_SIZE = 4;
-
- /**
- * The record overhead. In addition to entryID, key length and key bytes, the
- * record overhead includes the indexID + INS/DEL bit
- */
- private static final int REC_OVERHEAD = INT_SIZE + 1;
-
- /** Buffer records are either insert records or delete records. */
- private static final byte DEL = 0, INS = 1;
-
- /** The size of a buffer. */
- private final int size;
- /** Byte array holding the actual buffer data. */
- private final byte buffer[];
-
- /**
- * Used to break a tie (keys equal) when the buffers are being merged
- * for writing to the index scratch file.
- */
- private long id;
-
- /** Temporary buffer used to store integer values. */
- private final byte[] intBytes = new byte[INT_SIZE];
-
- /** OffSet where next key is written. */
- private int keyOffset;
- /** OffSet where next value record is written. */
- private int recordOffset;
- /** Amount of bytes left in the buffer. */
- private int bytesLeft;
- /** Number of keys in the buffer. */
- private int keys;
- /** Used to iterate over the buffer when writing to a scratch file. */
- private int position;
-
- /** The comparator to use sort the keys. */
- private ComparatorBuffer<byte[]> comparator;
-
- /**
- * Used to make sure that an instance of this class is put on the
- * correct scratch file writer work queue for processing.
- */
- private Importer.IndexKey indexKey;
-
- /** Initial capacity of re-usable buffer used in key compares. */
- private static final int CAP = 32;
-
- /**
- * This buffer is reused during key compares. It's main purpose is to keep
- * memory footprint as small as possible.
- */
- private ByteBuffer keyBuffer = ByteBuffer.allocate(CAP);
-
- /**
- * Set to {@code true} if the buffer should not be recycled. Used when the
- * importer/rebuild index process is doing phase one cleanup and flushing
- * buffers not completed.
- */
- private boolean discarded;
-
-
- /**
- * Create an instance of a IndexBuffer using the specified size.
- *
- * @param size The size of the underlying byte array.
- */
- public IndexOutputBuffer(int size) {
- this.size = size;
- this.buffer = new byte[size];
- this.bytesLeft = size;
- this.recordOffset = size - 1;
- }
-
-
- /**
- * Reset an IndexBuffer so it can be re-cycled.
- */
- public void reset() {
- bytesLeft = size;
- keyOffset = 0;
- recordOffset = size - 1;
- keys = 0;
- position = 0;
- comparator = null;
- indexKey = null;
- }
-
- /**
- * Creates a new poison buffer. Poison buffers are used to stop the processing of import tasks.
- *
- * @return a new poison buffer
- */
- public static IndexOutputBuffer poison()
- {
- return new IndexOutputBuffer(0);
- }
-
- /**
- * Set the ID of a buffer to the specified value.
- *
- * @param id The value to set the ID to.
- */
- public void setID(long id)
- {
- this.id = id;
- }
-
-
- /**
- * Return the ID of a buffer.
- *
- * @return The value of a buffer's ID.
- */
- private long getBufferID()
- {
- return this.id;
- }
-
-
- /**
- * Determines if a buffer is a poison buffer. A poison buffer is used to
- * shutdown work queues when import/rebuild index phase one is completed.
- * A poison buffer has a 0 size.
- *
- * @return {@code true} if a buffer is a poison buffer, or {@code false}
- * otherwise.
- */
- public boolean isPoison()
- {
- return size == 0;
- }
-
- /**
- * Determines if buffer should be re-cycled by calling {@link #reset()}.
- *
- * @return {@code true} if buffer should be recycled, or {@code false} if it
- * should not.
- */
- public boolean isDiscarded()
- {
- return discarded;
- }
-
- /**
- * Sets the discarded flag to {@code true}.
- */
- public void discard()
- {
- discarded = true;
- }
-
- /**
- * Returns {@code true} if there is enough space available to write the
- * specified byte array in the buffer. It returns {@code false} otherwise.
- *
- * @param kBytes The byte array to check space against.
- * @param id The id value to check space against.
- * @return {@code true} if there is space to write the byte array in a
- * buffer, or {@code false} otherwise.
- */
- public boolean isSpaceAvailable(byte[] kBytes, long id) {
- return getRequiredSize(kBytes.length, id) < bytesLeft;
- }
-
- /**
- * Set the comparator to be used in the buffer processing to the specified
- * comparator.
- *
- * @param comparator The comparator to set the buffer's comparator to.
- */
- public void setComparator(ComparatorBuffer<byte[]> comparator)
- {
- this.comparator = comparator;
- }
-
-
- /**
- * Return a buffer's current position value.
- *
- * @return The buffer's current position value.
- */
- public int getPosition()
- {
- return position;
- }
-
-
- /**
- * Set a buffer's position value to the specified position.
- *
- * @param position The value to set the position to.
- */
- public void setPosition(int position)
- {
- this.position = position;
- }
-
-
- /**
- * Sort the buffer.
- */
- public void sort() {
- sort(0, keys);
- }
-
-
- /**
- * Add the specified key byte array and EntryID to the buffer.
- *
- * @param keyBytes The key byte array.
- * @param entryID The EntryID.
- * @param indexID The index ID the record belongs.
- * @param insert <CODE>True</CODE> if key is an insert, false otherwise.
- */
- public void add(byte[] keyBytes, EntryID entryID, int indexID,
- boolean insert) {
- // write the record data, but leave the space to write the record size just
- // before it
- recordOffset = addRecord(keyBytes, entryID.longValue(), indexID, insert);
- // then write the returned record size
- System.arraycopy(getIntBytes(recordOffset), 0, buffer, keyOffset, INT_SIZE);
- keyOffset += INT_SIZE;
- bytesLeft = recordOffset - keyOffset;
- keys++;
- }
-
-
- /**
- * Writes the full record minus the record size itself.
- */
- private int addRecord(byte[]key, long id, int indexID, boolean insert)
- {
- int retOffset = recordOffset - getRecordSize(key.length, id);
- int offSet = retOffset;
-
- // write the INS/DEL bit
- buffer[offSet++] = insert ? INS : DEL;
- // write the indexID
- System.arraycopy(getIntBytes(indexID), 0, buffer, offSet, INT_SIZE);
- offSet += INT_SIZE;
- // write the entryID
- offSet = PackedInteger.writeLong(buffer, offSet, id);
- // write the key length
- offSet = PackedInteger.writeInt(buffer, offSet, key.length);
- // write the key bytes
- System.arraycopy(key, 0, buffer, offSet, key.length);
- return retOffset;
- }
-
-
- /**
- * Computes the full size of the record.
- *
- * @param keyLen The length of the key of index
- * @param id The entry id
- * @return The size that such record would take in the IndexOutputBuffer
- */
- public static int getRequiredSize(int keyLen, long id)
- {
- // Adds up the key length + key bytes + entryID + indexID + the INS/DEL bit
- // and finally the space needed to store the record size
- return getRecordSize(keyLen, id) + INT_SIZE;
- }
-
- private static int getRecordSize(int keyLen, long id)
- {
- // Adds up the key length + key bytes + ...
- return PackedInteger.getWriteIntLength(keyLen) + keyLen +
- // ... entryID + (indexID + INS/DEL bit).
- PackedInteger.getWriteLongLength(id) + REC_OVERHEAD;
- }
-
-
- /**
- * Write record at specified index to the specified output stream. Used when
- * when writing the index scratch files.
-
- * @param stream The stream to write the record at the index to.
- * @param index The index of the record to write.
- */
- public void writeID(ByteArrayOutputStream stream, int index)
- {
- int offSet = getIntegerValue(index * INT_SIZE);
- int len = PackedInteger.getReadLongLength(buffer, offSet + REC_OVERHEAD);
- stream.write(buffer, offSet + REC_OVERHEAD, len);
- }
-
-
- /**
- * Return {@code true} if the record specified by the index is an insert
- * record, or {@code false} if it a delete record.
- *
- * @param index The index of the record.
- *
- * @return {@code true} if the record is an insert record, or {@code false}
- * if it is a delete record.
- */
- public boolean isInsertRecord(int index)
- {
- int recOffset = getIntegerValue(index * INT_SIZE);
- return buffer[recOffset] != DEL;
- }
-
-
- /**
- * Return the size of the key part of the record.
- *
- * @return The size of the key part of the record.
- */
- public int getKeySize()
- {
- int offSet = getIntegerValue(position * INT_SIZE) + REC_OVERHEAD;
- offSet += PackedInteger.getReadIntLength(buffer, offSet);
- return PackedInteger.readInt(buffer, offSet);
- }
-
-
- /**
- * Return the key value part of a record indicated by the current buffer
- * position.
- *
- * @return byte array containing the key value.
- */
- public byte[] getKey()
- {
- return getKey(position);
- }
-
- /** Used to minimized memory usage when comparing keys. */
- private ByteBuffer getKeyBuf(int x)
- {
- keyBuffer.clear();
- int offSet = getIntegerValue(x * INT_SIZE) + REC_OVERHEAD;
- offSet += PackedInteger.getReadIntLength(buffer, offSet);
- int keyLen = PackedInteger.readInt(buffer, offSet);
- offSet += PackedInteger.getReadIntLength(buffer, offSet);
- //Re-allocate if the key is bigger than the capacity.
- if(keyLen > keyBuffer.capacity())
- {
- keyBuffer = ByteBuffer.allocate(keyLen);
- }
- keyBuffer.put(buffer, offSet, keyLen);
- keyBuffer.flip();
- return keyBuffer;
- }
-
-
- /**
- * Return the key value part of a record specified by the index.
- *
- * @param x index to return.
- * @return byte array containing the key value.
- */
- private byte[] getKey(int x)
- {
- int offSet = getIntegerValue(x * INT_SIZE) + REC_OVERHEAD;
- offSet += PackedInteger.getReadIntLength(buffer, offSet);
- int keyLen = PackedInteger.readInt(buffer, offSet);
- offSet += PackedInteger.getReadIntLength(buffer, offSet);
- byte[] key = new byte[keyLen];
- System.arraycopy(buffer, offSet, key, 0, keyLen);
- return key;
- }
-
-
- private int getIndexID(int x)
- {
- return getIntegerValue(getIntegerValue(x * INT_SIZE) + 1);
- }
-
-
- /**
- * Return index id associated with the current position's record.
- *
- * @return The index id.
- */
- public int getIndexID()
- {
- return getIntegerValue(getIntegerValue(position * INT_SIZE) + 1);
- }
-
-
- private boolean is(int x, int y, CompareOp op)
- {
- int xoffSet = getIntegerValue(x * INT_SIZE);
- int xIndexID = getIntegerValue(xoffSet + 1);
- xoffSet += REC_OVERHEAD;
- xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet);
- int xKeyLen = PackedInteger.readInt(buffer, xoffSet);
- int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet;
- int yoffSet = getIntegerValue(y * INT_SIZE);
- int yIndexID = getIntegerValue(yoffSet + 1);
- yoffSet += REC_OVERHEAD;
- yoffSet += PackedInteger.getReadIntLength(buffer, yoffSet);
- int yKeyLen = PackedInteger.readInt(buffer, yoffSet);
- int yKey = PackedInteger.getReadIntLength(buffer, yoffSet) + yoffSet;
- return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen,
- xIndexID, yKey, yKeyLen, yIndexID), op);
- }
-
-
- private boolean is(int x, byte[] yKey, CompareOp op, int yIndexID)
- {
- int xoffSet = getIntegerValue(x * INT_SIZE);
- int xIndexID = getIntegerValue(xoffSet + 1);
- xoffSet += REC_OVERHEAD;
- xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet);
- int xKeyLen = PackedInteger.readInt(buffer, xoffSet);
- int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet;
- return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen,
- xIndexID, yKey, yKey.length, yIndexID), op);
- }
-
-
- /**
- * Compare the byte array at the current position with the specified one and
- * using the specified index id. It will return {@code true} if the byte
- * array at the current position is equal to the specified byte array as
- * determined by the comparator and the index ID is is equal. It will
- * return {@code false} otherwise.
- *
- * @param b The byte array to compare.
- * @param bIndexID The index key.
- * @return <CODE>True</CODE> if the byte arrays are equal.
- */
- public boolean compare(byte[]b, int bIndexID)
- {
- int offset = getIntegerValue(position * INT_SIZE);
- int indexID = getIntegerValue(offset + 1);
- offset += REC_OVERHEAD;
- offset += PackedInteger.getReadIntLength(buffer, offset);
- int keyLen = PackedInteger.readInt(buffer, offset);
- int key = PackedInteger.getReadIntLength(buffer, offset) + offset;
- return comparator.compare(buffer, key, keyLen, b, b.length) == 0
- && indexID == bIndexID;
- }
-
-
- /**
- * Compare current IndexBuffer to the specified index buffer using both the
- * comparator and index ID of both buffers.
- *
- * The key at the value of position in both buffers are used in the compare.
- *
- * @param b The IndexBuffer to compare to.
- * @return 0 if the buffers are equal, -1 if the current buffer is less
- * than the specified buffer, or 1 if it is greater.
- */
- @Override
- public int compareTo(IndexOutputBuffer b)
- {
- final ByteBuffer keyBuf = b.getKeyBuf(b.position);
- int offset = getIntegerValue(position * INT_SIZE);
- int indexID = getIntegerValue(offset + 1);
- offset += REC_OVERHEAD;
- offset += PackedInteger.getReadIntLength(buffer, offset);
- int keyLen = PackedInteger.readInt(buffer, offset);
- int key = PackedInteger.getReadIntLength(buffer, offset) + offset;
-
- final int cmp = comparator.compare(buffer, key, keyLen, keyBuf.array(), keyBuf.limit());
- if (cmp != 0)
- {
- return cmp;
- }
-
- final int bIndexID = b.getIndexID();
- if (indexID == bIndexID)
- {
- // This is tested in a tree set remove when a buffer is removed from the tree set.
- return compare(this.id, b.getBufferID());
- }
- else if (indexID < bIndexID)
- {
- return -1;
- }
- else
- {
- return 1;
- }
- }
-
- private int compare(long l1, long l2)
- {
- if (l1 == l2)
- {
- return 0;
- }
- else if (l1 < l2)
- {
- return -1;
- }
- else
- {
- return 1;
- }
- }
-
-
- /**
- * Write a record to specified output stream using the record pointed to by
- * the current position and the specified byte stream of ids.
- *
- * @param dataStream The data output stream to write to.
- *
- * @throws IOException If an I/O error occurs writing the record.
- */
- public void writeKey(DataOutputStream dataStream) throws IOException
- {
- int offSet = getIntegerValue(position * INT_SIZE) + REC_OVERHEAD;
- offSet += PackedInteger.getReadIntLength(buffer, offSet);
- int keyLen = PackedInteger.readInt(buffer, offSet);
- offSet += PackedInteger.getReadIntLength(buffer, offSet);
- dataStream.write(buffer, offSet, keyLen);
- }
-
- /**
- * Compare the byte array at the current position with the byte array at the
- * specified index.
- *
- * @param i The index pointing to the byte array to compare.
- * @return {@code true} if the byte arrays are equal, or {@code false} otherwise
- */
- public boolean compare(int i)
- {
- return is(i, position, CompareOp.EQ);
- }
-
- /**
- * Return the current number of keys.
- *
- * @return The number of keys currently in an index buffer.
- */
- public int getNumberKeys()
- {
- return keys;
- }
-
- /**
- * Return {@code true} if the buffer has more data to process, or
- * {@code false} otherwise. Used when iterating over the buffer writing the
- * scratch index file.
- *
- * @return {@code true} if a buffer has more data to process, or
- * {@code false} otherwise.
- */
- public boolean hasMoreData()
- {
- return position + 1 < keys;
- }
-
- /**
- * Advance the position pointer to the next record in the buffer. Used when
- * iterating over the buffer examining keys.
- */
- public void nextRecord()
- {
- position++;
- }
-
- private byte[] getIntBytes(int val)
- {
- for (int i = 3; i >= 0; i--) {
- intBytes[i] = (byte) (val & 0xff);
- val >>>= 8;
- }
- return intBytes;
- }
-
-
- private int getIntegerValue(int index)
- {
- int answer = 0;
- for (int i = 0; i < INT_SIZE; i++) {
- byte b = buffer[index + i];
- answer <<= 8;
- answer |= (b & 0xff);
- }
- return answer;
- }
-
-
- private int med3(int a, int b, int c)
- {
- return is(a, b, CompareOp.LT) ?
- (is(b,c,CompareOp.LT) ? b : is(a,c,CompareOp.LT) ? c : a) :
- (is(b,c,CompareOp.GT) ? b : is(a,c,CompareOp.GT) ? c : a);
- }
-
-
- private void sort(int off, int len)
- {
- if (len < 7) {
- for (int i=off; i<len+off; i++)
- {
- for (int j=i; j>off && is(j-1, j, CompareOp.GT); j--)
- {
- swap(j, j-1);
- }
- }
- return;
- }
-
- int m = off + (len >> 1);
- if (len > 7) {
- int l = off;
- int n = off + len - 1;
- if (len > 40) {
- int s = len/8;
- l = med3(l, l+s, l+2*s);
- m = med3(m-s, m, m+s);
- n = med3(n-2*s, n-s, n);
- }
- m = med3(l, m, n);
- }
-
- byte[] mKey = getKey(m);
- int mIndexID = getIndexID(m);
-
- int a = off, b = a, c = off + len - 1, d = c;
- while(true)
- {
- while (b <= c && is(b, mKey, CompareOp.LE, mIndexID))
- {
- if (is(b, mKey, CompareOp.EQ, mIndexID))
- {
- swap(a++, b);
- }
- b++;
- }
- while (c >= b && is(c, mKey, CompareOp.GE, mIndexID))
- {
- if (is(c, mKey, CompareOp.EQ, mIndexID))
- {
- swap(c, d--);
- }
- c--;
- }
- if (b > c)
- {
- break;
- }
- swap(b++, c--);
- }
-
- // Swap partition elements back to middle
- int s, n = off + len;
- s = Math.min(a-off, b-a );
- vectorSwap(off, b-s, s);
- s = Math.min(d-c, n-d-1);
- vectorSwap(b, n-s, s);
-
- s = b - a;
- // Recursively sort non-partition-elements
- if (s > 1)
- {
- sort(off, s);
- }
- s = d - c;
- if (s > 1)
- {
- sort(n-s, s);
- }
- }
-
-
- private void swap(int a, int b)
- {
- int aOffset = a * INT_SIZE;
- int bOffset = b * INT_SIZE;
- int bVal = getIntegerValue(bOffset);
- System.arraycopy(buffer, aOffset, buffer, bOffset, INT_SIZE);
- System.arraycopy(getIntBytes(bVal), 0, buffer, aOffset, INT_SIZE);
- }
-
-
- private void vectorSwap(int a, int b, int n)
- {
- for (int i=0; i<n; i++, a++, b++)
- {
- swap(a, b);
- }
- }
-
-
- private boolean evaluateReturnCode(int rc, CompareOp op)
- {
- switch(op) {
- case LT:
- return rc < 0;
- case GT:
- return rc > 0;
- case LE:
- return rc <= 0;
- case GE:
- return rc >= 0;
- case EQ:
- return rc == 0;
- default:
- return false;
- }
- }
-
-
- /**
- * Interface that defines two methods used to compare keys used in this
- * class. The Comparator interface cannot be used in this class, so this
- * special one is used that knows about the special properties of this class.
- *
- * @param <T> object to use in the compare
- */
- public static interface ComparatorBuffer<T> {
-
-
- /**
- * Compare two offsets in an object, usually a byte array.
- *
- * @param o The object.
- * @param offset The first offset.
- * @param length The first length.
- * @param indexID The first index id.
- * @param otherOffset The second offset.
- * @param otherLength The second length.
- * @param otherIndexID The second index id.
- * @return a negative integer, zero, or a positive integer as the first
- * offset value is less than, equal to, or greater than the second.
- */
- int compare(T o, int offset, int length, int indexID, int otherOffset,
- int otherLength, int otherIndexID);
-
-
- /**
- * Compare an offset in an object with the specified object.
- *
- * @param o The first object.
- * @param offset The first offset.
- * @param length The first length.
- * @param indexID The first index id.
- * @param other The second object.
- * @param otherLength The length of the second object.
- * @param otherIndexID The second index id.
- * @return a negative integer, zero, or a positive integer as the first
- * offset value is less than, equal to, or greater than the second
- * object.
- */
- int compare(T o, int offset, int length, int indexID, T other,
- int otherLength, int otherIndexID);
-
-
- /**
- * Compare an offset in an object with the specified object.
- *
- * @param o The first object.
- * @param offset The first offset.
- * @param length The first length.
- * @param other The second object.
- * @param otherLen The length of the second object.
- * @return a negative integer, zero, or a positive integer as the first
- * offset value is less than, equal to, or greater than the second
- * object.
- */
- int compare(T o, int offset, int length, T other,
- int otherLen);
-
- }
-
-
- /**
- * Implementation of ComparatorBuffer interface. Used to compare keys when
- * they are non-DN indexes.
- */
- public static class IndexComparator implements IndexOutputBuffer.ComparatorBuffer<byte[]>
- {
-
- /**
- * Compare two offsets in an byte array using the index compare
- * algorithm. The specified index ID is used in the comparison if the
- * byte arrays are equal.
- *
- * @param b The byte array.
- * @param offset The first offset.
- * @param length The first length.
- * @param indexID The first index id.
- * @param otherOffset The second offset.
- * @param otherLength The second length.
- * @param otherIndexID The second index id.
- * @return a negative integer, zero, or a positive integer as the first
- * offset value is less than, equal to, or greater than the second.
- */
- @Override
- public int compare(byte[] b, int offset, int length, int indexID,
- int otherOffset, int otherLength, int otherIndexID)
- {
- for(int i = 0; i < length && i < otherLength; i++)
- {
- if(b[offset + i] > b[otherOffset + i])
- {
- return 1;
- }
- else if (b[offset + i] < b[otherOffset + i])
- {
- return -1;
- }
- }
- return compareLengthThenIndexID(length, indexID, otherLength, otherIndexID);
- }
-
- /**
- * Compare an offset in an byte array with the specified byte array,
- * using the DN compare algorithm. The specified index ID is used in the
- * comparison if the byte arrays are equal.
- *
- * @param b The byte array.
- * @param offset The first offset.
- * @param length The first length.
- * @param indexID The first index id.
- * @param other The second byte array to compare to.
- * @param otherLength The second byte array's length.
- * @param otherIndexID The second index id.
- * @return a negative integer, zero, or a positive integer as the first
- * offset value is less than, equal to, or greater than the second
- * byte array.
- */
- @Override
- public int compare(byte[] b, int offset, int length, int indexID,
- byte[] other, int otherLength, int otherIndexID)
- {
- for(int i = 0; i < length && i < otherLength; i++)
- {
- if(b[offset + i] > other[i])
- {
- return 1;
- }
- else if (b[offset + i] < other[i])
- {
- return -1;
- }
- }
- return compareLengthThenIndexID(length, indexID, otherLength, otherIndexID);
- }
-
- /**
- * The arrays are equal, make sure they are in the same index
- * since multiple suffixes might have the same key.
- */
- private int compareLengthThenIndexID(int length, int indexID, int otherLength, int otherIndexID)
- {
- if (length == otherLength)
- {
- return compare(indexID, otherIndexID);
- }
- else if (length > otherLength)
- {
- return 1;
- }
- else
- {
- return -1;
- }
- }
-
- /**
- * Compare an offset in an byte array with the specified byte array,
- * using the DN compare algorithm.
- *
- * @param b The byte array.
- * @param offset The first offset.
- * @param length The first length.
- * @param other The second byte array to compare to.
- * @param otherLength The second byte array's length.
- * @return a negative integer, zero, or a positive integer as the first
- * offset value is less than, equal to, or greater than the second
- * byte array.
- */
- @Override
- public int compare(byte[] b, int offset, int length, byte[] other,
- int otherLength)
- {
- for(int i = 0; i < length && i < otherLength; i++)
- {
- if(b[offset + i] > other[i])
- {
- return 1;
- }
- else if (b[offset + i] < other[i])
- {
- return -1;
- }
- }
- return compare(length, otherLength);
- }
-
- private int compare(int i1, int i2)
- {
- if (i1 == i2)
- {
- return 0;
- }
- else if (i1 > i2)
- {
- return 1;
- }
- else
- {
- return -1;
- }
- }
- }
-
-
- /**
- * Set the index key associated with an index buffer.
- *
- * @param indexKey The index key.
- */
- public void setIndexKey(Importer.IndexKey indexKey)
- {
- this.indexKey = indexKey;
- }
-
-
- /**
- * Return the index key of an index buffer.
- * @return The index buffer's index key.
- */
- public Importer.IndexKey getIndexKey()
- {
- return indexKey;
- }
-}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java
deleted file mode 100644
index 19f6c4d..0000000
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2014-2015 ForgeRock AS
- */
-package org.opends.server.backends.jeb.importLDIF;
-
-import static org.opends.messages.JebMessages.*;
-import static org.opends.server.util.ServerConstants.*;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.opends.server.backends.jeb.*;
-import org.opends.server.backends.jeb.importLDIF.Importer.DNCache;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.DN;
-
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.LockMode;
-
-/**
- * The class represents a suffix that is to be loaded during an import, or
- * rebuild index process. Multiple instances of this class can be instantiated
- * during and import to support multiple suffixes in a backend. A rebuild
- * index has only one of these instances.
- */
-public class Suffix
-{
-
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- private final List<DN> includeBranches, excludeBranches;
- private final DN baseDN;
- private final EntryContainer srcEntryContainer;
- private final EntryContainer entryContainer;
- private final Object synchObject = new Object();
- private static final int PARENT_ID_SET_SIZE = 16 * 1024;
- private final ConcurrentHashMap<DN, CountDownLatch> pendingMap =
- new ConcurrentHashMap<DN, CountDownLatch>();
- private final Set<DN> parentSet = new HashSet<DN>(PARENT_ID_SET_SIZE);
- private DN parentDN;
-
- /**
- * Creates a suffix instance using the specified parameters.
- *
- * @param entryContainer The entry container pertaining to the suffix.
- * @param srcEntryContainer The original entry container.
- * @param includeBranches The include branches.
- * @param excludeBranches The exclude branches.
- */
- Suffix(EntryContainer entryContainer, EntryContainer srcEntryContainer,
- List<DN> includeBranches, List<DN> excludeBranches)
- {
- this.entryContainer = entryContainer;
- this.srcEntryContainer = srcEntryContainer;
- this.baseDN = entryContainer.getBaseDN();
- if (includeBranches != null)
- {
- this.includeBranches = includeBranches;
- }
- else
- {
- this.includeBranches = new ArrayList<DN>(0);
- }
- if (excludeBranches != null)
- {
- this.excludeBranches = excludeBranches;
- }
- else
- {
- this.excludeBranches = new ArrayList<DN>(0);
- }
- }
-
- /**
- * Returns the DN2ID instance pertaining to a suffix instance.
- *
- * @return A DN2ID instance that can be used to manipulate the DN2ID database.
- */
- public DN2ID getDN2ID()
- {
- return entryContainer.getDN2ID();
- }
-
-
- /**
- * Returns the ID2Entry instance pertaining to a suffix instance.
- *
- * @return A ID2Entry instance that can be used to manipulate the ID2Entry
- * database.
- */
- public ID2Entry getID2Entry()
- {
- return entryContainer.getID2Entry();
- }
-
-
- /**
- * Returns the DN2URI instance pertaining to a suffix instance.
- *
- * @return A DN2URI instance that can be used to manipulate the DN2URI
- * database.
- */
- public DN2URI getDN2URI()
- {
- return entryContainer.getDN2URI();
- }
-
-
- /**
- * Returns the entry container pertaining to a suffix instance.
- *
- * @return The entry container used to create a suffix instance.
- */
- public EntryContainer getEntryContainer()
- {
- return entryContainer;
- }
-
-
- /**
- * Return the Attribute Type - Index map used to map an attribute type to an
- * index instance.
- *
- * @return A suffixes Attribute Type - Index map.
- */
- public Map<AttributeType, AttributeIndex> getAttrIndexMap()
- {
- return entryContainer.getAttributeIndexMap();
- }
-
-
- /**
- * Make sure the specified parent DN is not in the pending map.
- *
- * @param parentDN The DN of the parent.
- */
- private void assureNotPending(DN parentDN) throws InterruptedException
- {
- final CountDownLatch l = pendingMap.get(parentDN);
- if (l != null)
- {
- l.await();
- }
- }
-
-
- /**
- * Add specified DN to the pending map.
- *
- * @param dn The DN to add to the map.
- */
- public void addPending(DN dn)
- {
- pendingMap.putIfAbsent(dn, new CountDownLatch(1));
- }
-
-
- /**
- * Remove the specified DN from the pending map, it may not exist if the
- * entries are being migrated so just return.
- *
- * @param dn The DN to remove from the map.
- */
- public void removePending(DN dn)
- {
- CountDownLatch l = pendingMap.remove(dn);
- if(l != null)
- {
- l.countDown();
- }
- }
-
-
- /**
- * Return {@code true} if the specified dn is contained in the parent set, or
- * in the specified DN cache. This would indicate that the parent has already
- * been processed. It returns {@code false} otherwise.
- *
- * It will optionally check the dn2id database for the dn if the specified
- * cleared backend boolean is {@code true}.
- *
- * @param dn The DN to check for.
- * @param dnCache The importer DN cache.
- * @param clearedBackend Set to {@code true} if the import process cleared the
- * backend before processing.
- * @return {@code true} if the dn is contained in the parent ID, or
- * {@code false} otherwise.
- *
- * @throws DatabaseException If an error occurred searching the DN cache, or
- * dn2id database.
- * @throws InterruptedException If an error occurred processing the pending
- * map.
- */
- public
- boolean isParentProcessed(DN dn, DNCache dnCache, boolean clearedBackend)
- throws DatabaseException, InterruptedException {
- synchronized(synchObject) {
- if(parentSet.contains(dn))
- {
- return true;
- }
- }
- //The DN was not in the parent set. Make sure it isn't pending.
- try {
- assureNotPending(dn);
- } catch (InterruptedException e) {
- logger.error(ERR_JEB_IMPORT_LDIF_PENDING_ERR, e.getMessage());
- throw e;
- }
- // Either parent is in the DN cache,
- // or else check the dn2id database for the DN (only if backend wasn't cleared)
- final boolean parentThere = dnCache.contains(dn)
- || (!clearedBackend
- && getDN2ID().get(null, dn, LockMode.DEFAULT) != null);
- //Add the DN to the parent set if needed.
- if (parentThere) {
- synchronized(synchObject) {
- if (parentSet.size() >= PARENT_ID_SET_SIZE) {
- Iterator<DN> iterator = parentSet.iterator();
- iterator.next();
- iterator.remove();
- }
- parentSet.add(dn);
- }
- }
- return parentThere;
- }
-
-
- /**
- * Sets the trusted status of all of the indexes, vlvIndexes, id2children
- * and id2subtree indexes.
- *
- * @param trusted True if the indexes should be trusted or false
- * otherwise.
- *
- * @throws DatabaseException If an error occurred setting the indexes to
- * trusted.
- */
- public void setIndexesTrusted(boolean trusted) throws DatabaseException
- {
- entryContainer.getID2Children().setTrusted(null,trusted);
- entryContainer.getID2Subtree().setTrusted(null, trusted);
- for (AttributeIndex attributeIndex : entryContainer.getAttributeIndexes())
- {
- setTrusted(attributeIndex.getEqualityIndex(), trusted);
- setTrusted(attributeIndex.getPresenceIndex(), trusted);
- setTrusted(attributeIndex.getSubstringIndex(), trusted);
- setTrusted(attributeIndex.getOrderingIndex(), trusted);
- setTrusted(attributeIndex.getApproximateIndex(), trusted);
- Map<String, Collection<Index>> exIndexes = attributeIndex.getExtensibleIndexes();
- if(!exIndexes.isEmpty())
- {
- setTrusted(exIndexes.get(EXTENSIBLE_INDEXER_ID_SUBSTRING), trusted);
- setTrusted(exIndexes.get(EXTENSIBLE_INDEXER_ID_SHARED), trusted);
- }
- }
- for(VLVIndex vlvIdx : entryContainer.getVLVIndexes()) {
- vlvIdx.setTrusted(null, trusted);
- }
- }
-
- private void setTrusted(Index index, boolean trusted)
- {
- if (index != null)
- {
- index.setTrusted(null, trusted);
- }
- }
-
- private void setTrusted(Collection<Index> subIndexes, boolean trusted)
- {
- if (subIndexes != null)
- {
- for (Index subIndex : subIndexes)
- {
- subIndex.setTrusted(null, trusted);
- }
- }
- }
-
- /**
- * Get the parent DN of the last entry added to a suffix.
- *
- * @return The parent DN of the last entry added.
- */
- public DN getParentDN()
- {
- return parentDN;
- }
-
-
- /**
- * Set the parent DN of the last entry added to a suffix.
- *
- * @param parentDN The parent DN to save.
- */
- public void setParentDN(DN parentDN)
- {
- this.parentDN = parentDN;
- }
-
- /**
- * Return a src entry container.
- *
- * @return The src entry container.
- */
- public EntryContainer getSrcEntryContainer()
- {
- return this.srcEntryContainer;
- }
-
-
- /**
- * Return include branches.
- *
- * @return The include branches.
- */
- public List<DN> getIncludeBranches()
- {
- return this.includeBranches;
- }
-
-
- /**
- * Return exclude branches.
- *
- * @return the exclude branches.
- */
- public List<DN> getExcludeBranches()
- {
- return this.excludeBranches;
- }
-
-
- /**
- * Return base DN.
- *
- * @return The base DN.
- */
- public DN getBaseDN()
- {
- return this.baseDN;
- }
-}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/package-info.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/package-info.java
deleted file mode 100644
index 91a9da0..0000000
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/package-info.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2008 Sun Microsystems, Inc.
- */
-
-
-
-/**
- * Contains the code for the import LDIF JEB backend.
- */
-@org.opends.server.types.PublicAPI(
- stability=org.opends.server.types.StabilityLevel.PRIVATE)
-package org.opends.server.backends.jeb.importLDIF;
-
--
Gitblit v1.10.0