opendj3-server-dev/src/server/org/opends/server/backends/pluggable/EntryIDSet.java
@@ -101,9 +101,7 @@ else if ((bytes.byteAt(0) & 0x80) == 0x80) { // Entry limit has exceeded and there is an encoded undefined set size. undefinedSize = bytes.length() == 8 ? bytes.toLong() & Long.MAX_VALUE // remove top bit : Long.MAX_VALUE; undefinedSize = decodeUndefinedSize(bytes); } else { @@ -113,7 +111,28 @@ } } private long[] decodeEntryIDList(ByteString bytes) /** * Decodes and returns the undefined size out of the provided byte string. * * @param bytes * the encoded undefined size * @return the undefined size */ static long decodeUndefinedSize(ByteString bytes) { return bytes.length() == 8 ? bytes.toLong() & Long.MAX_VALUE // remove top bit : Long.MAX_VALUE; } /** * Decodes and returns the entryID list out of the provided byte sequence. * * @param bytes * the encoded entryID list * @return a long array representing the entryID list */ static long[] decodeEntryIDList(ByteSequence bytes) { final ByteSequenceReader reader = bytes.asReader(); final int count = bytes.length() / 8; @@ -301,9 +320,9 @@ if (isDefined()) { final ByteStringBuilder builder = new ByteStringBuilder(8 * values.length); for (int i = 0; i < values.length; i++) for (long value : values) { builder.append(values[i]); builder.append(value); } return builder.toByteString(); } opendj3-server-dev/src/server/org/opends/server/backends/pluggable/ImportIDSet.java
New file @@ -0,0 +1,478 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2011-2015 ForgeRock AS */ package org.opends.server.backends.pluggable; import java.nio.ByteBuffer; import org.forgerock.opendj.ldap.ByteSequence; import org.forgerock.opendj.ldap.ByteString; import org.forgerock.opendj.ldap.ByteStringBuilder; /** * This class manages the set of ID that are to be eventually added to an index * database. It is responsible for determining if the number of IDs is above * the configured ID limit. If the limit it reached, the class stops tracking * individual IDs and marks the set as undefined. This class is not thread safe. */ class ImportIDSet { /** The internal array where elements are stored. */ private long[] array; /** The number of valid elements in the array. */ private int count; /** Boolean to keep track if the instance is defined or not. */ private boolean isDefined = true; /** Size of the undefined if count is kept. */ private long undefinedSize; /** Key related to an ID set. */ private ByteBuffer key; /** The entry limit size. */ private final int limit; /** Set to true if a count of ids above the entry limit should be kept. */ private final boolean doCount; /** * Create an import ID set of the specified size, index limit and index * maintain count, plus an extra 128 slots. * * @param size The size of the the underlying array, plus some extra space. * @param limit The index entry limit. * @param doCount The index maintain count. */ public ImportIDSet(int size, int limit, boolean doCount) { this.array = new long[size + 128]; // A limit of 0 means unlimited. this.limit = limit == 0 ? Integer.MAX_VALUE : limit; this.doCount = doCount; } /** Create an empty import instance. */ public ImportIDSet() { this.limit = -1; this.doCount = false; } /** * Clear the set so it can be reused again. The boolean indexParam specifies * if the index parameters should be cleared also. */ public void clear() { undefinedSize = 0; isDefined = true; count = 0; } /** * Return if an import ID set is defined or not. * * @return <CODE>True</CODE> if an import ID set is defined. */ public boolean isDefined() { return isDefined; } /** Set an import ID set to undefined. */ private void setUndefined() { array = null; isDefined = false; } /** * Add the specified entry id to an import ID set. * * @param entryID The entry ID to add to an import ID set. */ void addEntryID(EntryID entryID) { addEntryID(entryID.longValue()); } /** * Add the specified long value to an import ID set. * * @param l The long value to add to an import ID set. */ void addEntryID(long l) { if(!isDefined()) { if(doCount) { undefinedSize++; } return; } if (l < 0 || (isDefined() && count + 1 > limit)) { setUndefined(); if(doCount) { undefinedSize = count + 1; } else { undefinedSize = Long.MAX_VALUE; } count = 0; } else { add(l); } } private boolean mergeCount(ByteString dBbytes, ImportIDSet importIdSet) { boolean incrementLimitCount=false; boolean dbUndefined = isDBUndefined(dBbytes); if (dbUndefined && !importIdSet.isDefined()) { undefinedSize = EntryIDSet.decodeUndefinedSize(dBbytes) + importIdSet.undefinedSize; isDefined=false; } else if (dbUndefined && importIdSet.isDefined()) { undefinedSize = EntryIDSet.decodeUndefinedSize(dBbytes) + importIdSet.size(); isDefined=false; } else if(!importIdSet.isDefined()) { int dbSize = EntryIDSet.decodeEntryIDList(dBbytes).length; undefinedSize = dbSize + importIdSet.undefinedSize; isDefined = false; incrementLimitCount = true; } else { array = EntryIDSet.decodeEntryIDList(dBbytes); if(array.length + importIdSet.size() > limit) { undefinedSize = array.length + importIdSet.size(); isDefined=false; incrementLimitCount=true; } else { count = array.length; addAll(importIdSet); } } return incrementLimitCount; } /** * Remove the specified import ID set from the byte array read from the DB. * * @param bytes The byte array read from JEB. * @param importIdSet The import ID set to delete. */ public void remove(ByteSequence bytes, ImportIDSet importIdSet) { if (isDBUndefined(bytes)) { isDefined=false; importIdSet.setUndefined(); undefinedSize = Long.MAX_VALUE; } else if(!importIdSet.isDefined()) { isDefined=false; undefinedSize = Long.MAX_VALUE; } else { array = EntryIDSet.decodeEntryIDList(bytes); if(array.length - importIdSet.size() > limit) { isDefined=false; count = 0; importIdSet.setUndefined(); undefinedSize = Long.MAX_VALUE; } else { count = array.length; removeAll(importIdSet); } } } /** * Merge the specified byte array read from a DB, with the specified import * ID set. The specified limit and maintain count parameters define * if the newly merged set is defined or not. * * @param bytes The byte array of IDs read from a DB. * @param importIdSet The import ID set to merge the byte array with. * @return <CODE>True</CODE> if the import ID set started keeping a count as * a result of the merge. */ public boolean merge(ByteString bytes, ImportIDSet importIdSet) { boolean incrementLimitCount=false; if(doCount) { incrementLimitCount = mergeCount(bytes, importIdSet); } else if (isDBUndefined(bytes)) { isDefined = false; importIdSet.setUndefined(); undefinedSize = Long.MAX_VALUE; count = 0; } else if(!importIdSet.isDefined()) { isDefined = false; incrementLimitCount = true; undefinedSize = Long.MAX_VALUE; count = 0; } else { array = EntryIDSet.decodeEntryIDList(bytes); if (array.length + importIdSet.size() > limit) { isDefined = false; incrementLimitCount = true; count = 0; importIdSet.setUndefined(); undefinedSize = Long.MAX_VALUE; } else { count = array.length; addAll(importIdSet); } } return incrementLimitCount; } private boolean isDBUndefined(ByteSequence bytes) { return (bytes.byteAt(0) & 0x80) == 0x80; } private void removeAll(ImportIDSet that) { long[] newArray = new long[array.length]; int c = 0; for(int i=0; i < count; i++) { if(binarySearch(that.array, that.count, array[i]) < 0) { newArray[c++] = array[i]; } } array = newArray; count = c; } private void addAll(ImportIDSet that) { resize(this.count+that.count); if (that.count == 0) { return; } // Optimize for the case where the two sets are sure to have no overlap. if (this.count == 0 || that.array[0] > this.array[this.count-1]) { System.arraycopy(that.array, 0, this.array, this.count, that.count); count += that.count; return; } if (this.array[0] > that.array[that.count-1]) { System.arraycopy(this.array, 0, this.array, that.count, this.count); System.arraycopy(that.array, 0, this.array, 0, that.count); count += that.count; return; } int destPos = binarySearch(this.array, this.count, that.array[0]); if (destPos < 0) { destPos = -(destPos+1); } // Make space for the copy. int aCount = this.count - destPos; int aPos = destPos + that.count; int aEnd = aPos + aCount; System.arraycopy(this.array, destPos, this.array, aPos, aCount); // Optimize for the case where there is no overlap. if (this.array[aPos] > that.array[that.count-1]) { System.arraycopy(that.array, 0, this.array, destPos, that.count); count += that.count; return; } int bPos; for ( bPos = 0; aPos < aEnd && bPos < that.count; ) { if ( this.array[aPos] < that.array[bPos] ) { this.array[destPos++] = this.array[aPos++]; } else if ( this.array[aPos] > that.array[bPos] ) { this.array[destPos++] = that.array[bPos++]; } else { this.array[destPos++] = this.array[aPos++]; bPos++; } } // Copy any remainder. int aRemain = aEnd - aPos; if (aRemain > 0) { System.arraycopy(this.array, aPos, this.array, destPos, aRemain); destPos += aRemain; } int bRemain = that.count - bPos; if (bRemain > 0) { System.arraycopy(that.array, bPos, this.array, destPos, bRemain); destPos += bRemain; } count = destPos; } /** * Return the number of IDs in an import ID set. * * @return The current size of an import ID set. */ public int size() { return count; } private boolean add(long v) { resize(count+1); if (count == 0 || v > array[count-1]) { array[count++] = v; return true; } int pos = binarySearch(array, count, v); if (pos >=0) { return false; } // For a negative return value r, the index -(r+1) gives the array // index at which the specified value can be inserted to maintain // the sorted order of the array. pos = -(pos+1); System.arraycopy(array, pos, array, pos+1, count-pos); array[pos] = v; count++; return true; } private static int binarySearch(long[] a, int count, long key) { int low = 0; int high = count-1; while (low <= high) { int mid = low + high >> 1; long midVal = a[mid]; if (midVal < key) { low = mid + 1; } else if (midVal > key) { high = mid - 1; } else { return mid; // key found } } return -(low + 1); // key not found. } private void resize(int size) { if (array == null) { array = new long[size]; } else if (array.length < size) { // Expand the size of the array in powers of two. int newSize = array.length == 0 ? 1 : array.length; do { newSize *= 2; } while (newSize < size); long[] newBytes = new long[newSize]; System.arraycopy(array, 0, newBytes, 0, count); array = newBytes; } } /** * Create a byte string representing this object's key that is suitable to write to a DB. * * @return A byte string representing this object's key */ ByteString keyToByteString() { return ByteString.wrap(getKey().array(), 0, getKey().limit()); } /** * Create a byte string representing this object's value that is suitable to write to a DB. * * @return A byte string representing this object's value */ public ByteString valueToByteString() { if(isDefined) { return encodeDefined(); } return ByteString.valueOf(undefinedSize); } private ByteString encodeDefined() { int encodedSize = count * 8; ByteStringBuilder bytes = new ByteStringBuilder(encodedSize); for (int i = 0; i < count; i++) { bytes.append(array[i]); } return bytes.toByteString(); } /** * Set the DB key related to an import ID set. * * @param key Byte array containing the key. */ public void setKey(ByteBuffer key) { this.key = key; } /** * Return the DB key related to an import ID set. * * @return The byte array containing the key. */ public ByteBuffer getKey() { return key; } } opendj3-server-dev/src/server/org/opends/server/backends/pluggable/ImportLDIFReader.java
New file @@ -0,0 +1,273 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2015 ForgeRock AS */ package org.opends.server.backends.pluggable; import static org.opends.messages.UtilityMessages.*; import static org.opends.server.util.StaticUtils.*; import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.LocalizableMessageBuilder; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.util.Reject; import org.opends.server.api.plugin.PluginResult; import org.opends.server.types.AttributeBuilder; import org.opends.server.types.AttributeType; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.ObjectClass; import org.opends.server.util.LDIFException; import org.opends.server.util.LDIFReader; /** * This class specializes the LDIFReader for imports. */ final class ImportLDIFReader extends LDIFReader { private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private final RootContainer rootContainer; /** * Creates a new LDIF reader that will read information from the specified file. * * @param importConfig * The import configuration for this LDIF reader. It must not be <CODE>null</CODE>. * @param rootContainer * The root container needed to get the next entry ID. * @throws IOException * If a problem occurs while opening the LDIF file for reading. */ public ImportLDIFReader(LDIFImportConfig importConfig, RootContainer rootContainer) throws IOException { super(importConfig); Reject.ifNull(importConfig, rootContainer); this.rootContainer = rootContainer; } /** * Reads the next entry from the LDIF source. * * @return The next entry read from the LDIF source, or <CODE>null</CODE> if the end of the LDIF * data is reached. * @param suffixesMap * A map of suffixes instances. * @param entryInfo * A object to hold information about the entry ID and what suffix was selected. * @throws IOException * If an I/O problem occurs while reading from the file. * @throws LDIFException * If the information read cannot be parsed as an LDIF entry. */ public final Entry readEntry(Map<DN, Suffix> suffixesMap, Importer.EntryInformation entryInfo) throws IOException, LDIFException { final boolean checkSchema = importConfig.validateSchema(); while (true) { LinkedList<StringBuilder> lines; DN entryDN; EntryID entryID; Suffix suffix; synchronized (this) { // Read the set of lines that make up the next entry. lines = readEntryLines(); if (lines == null) { return null; } lastEntryBodyLines = lines; lastEntryHeaderLines = new LinkedList<StringBuilder>(); // Read the DN of the entry and see if it is one that should be included // in the import. try { entryDN = readDN(lines); } catch (LDIFException e) { logger.traceException(e); continue; } if (entryDN == null) { // This should only happen if the LDIF starts with the "version:" line // and has a blank line immediately after that. In that case, simply // read and return the next entry. continue; } else if (!importConfig.includeEntry(entryDN)) { logger.trace("Skipping entry %s because the DN is not one that " + "should be included based on the include and exclude branches.", entryDN); entriesRead.incrementAndGet(); logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN)); continue; } suffix = Importer.getMatchSuffix(entryDN, suffixesMap); if (suffix == null) { logger.trace("Skipping entry %s because the DN is not one that " + "should be included based on a suffix match check.", entryDN); entriesRead.incrementAndGet(); logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN)); continue; } entriesRead.incrementAndGet(); entryID = rootContainer.getNextEntryID(); suffix.addPending(entryDN); } // Create the entry and see if it is one that should be included in the import final Entry entry = createEntry(lines, entryDN, checkSchema, suffix); if (entry == null || !isIncludedInImport(entry, suffix, lines) || !invokeImportPlugins(entry, suffix, lines) || (checkSchema && !isValidAgainstSchema(entry, suffix, lines))) { continue; } entryInfo.setEntryID(entryID); entryInfo.setSuffix(suffix); // The entry should be included in the import, so return it. return entry; } } private Entry createEntry(List<StringBuilder> lines, DN entryDN, boolean checkSchema, Suffix suffix) { // Read the set of attributes from the entry. Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>(); Map<AttributeType, List<AttributeBuilder>> userAttrBuilders = new HashMap<AttributeType, List<AttributeBuilder>>(); Map<AttributeType, List<AttributeBuilder>> operationalAttrBuilders = new HashMap<AttributeType, List<AttributeBuilder>>(); try { for (StringBuilder line : lines) { readAttribute(lines, line, entryDN, objectClasses, userAttrBuilders, operationalAttrBuilders, checkSchema); } } catch (LDIFException e) { if (logger.isTraceEnabled()) { logger.trace("Skipping entry %s because reading" + "its attributes failed.", entryDN); } logToSkipWriter(lines, ERR_LDIF_READ_ATTR_SKIP.get(entryDN, e.getMessage())); suffix.removePending(entryDN); return null; } final Entry entry = new Entry(entryDN, objectClasses, toAttributesMap(userAttrBuilders), toAttributesMap(operationalAttrBuilders)); logger.trace("readEntry(), created entry: %s", entry); return entry; } private boolean isIncludedInImport(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines) { final DN entryDN = entry.getName(); try { if (!importConfig.includeEntry(entry)) { if (logger.isTraceEnabled()) { logger.trace("Skipping entry %s because the DN is not one that " + "should be included based on the include and exclude filters.", entryDN); } logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN)); suffix.removePending(entryDN); return false; } } catch (Exception e) { logger.traceException(e); suffix.removePending(entryDN); logToSkipWriter(lines, ERR_LDIF_COULD_NOT_EVALUATE_FILTERS_FOR_IMPORT.get(entry.getName(), lastEntryLineNumber, e)); suffix.removePending(entryDN); return false; } return true; } private boolean invokeImportPlugins(final Entry entry, Suffix suffix, LinkedList<StringBuilder> lines) { if (importConfig.invokeImportPlugins()) { PluginResult.ImportLDIF pluginResult = pluginConfigManager.invokeLDIFImportPlugins(importConfig, entry); if (!pluginResult.continueProcessing()) { final DN entryDN = entry.getName(); LocalizableMessage m; LocalizableMessage rejectMessage = pluginResult.getErrorMessage(); if (rejectMessage == null) { m = ERR_LDIF_REJECTED_BY_PLUGIN_NOMESSAGE.get(entryDN); } else { m = ERR_LDIF_REJECTED_BY_PLUGIN.get(entryDN, rejectMessage); } logToRejectWriter(lines, m); suffix.removePending(entryDN); return false; } } return true; } private boolean isValidAgainstSchema(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines) { final DN entryDN = entry.getName(); addRDNAttributesIfNecessary(entryDN, entry.getUserAttributes(), entry.getOperationalAttributes()); // Add any superior objectclass(s) missing in the objectclass map. addSuperiorObjectClasses(entry.getObjectClasses()); LocalizableMessageBuilder invalidReason = new LocalizableMessageBuilder(); if (!entry.conformsToSchema(null, false, true, false, invalidReason)) { LocalizableMessage message = ERR_LDIF_SCHEMA_VIOLATION.get(entryDN, lastEntryLineNumber, invalidReason); logToRejectWriter(lines, message); suffix.removePending(entryDN); return false; } return true; } } opendj3-server-dev/src/server/org/opends/server/backends/pluggable/Importer.java
New file Diff too large opendj3-server-dev/src/server/org/opends/server/backends/pluggable/Index.java
@@ -106,6 +106,8 @@ */ private boolean rebuildRunning; private final ImportIDSet newImportIDSet; /** * Create a new index object. * @param name The name of the index database within the entryContainer. @@ -132,6 +134,7 @@ this.indexEntryLimit = indexEntryLimit; this.cursorEntryLimit = cursorEntryLimit; this.maintainCount = maintainCount; this.newImportIDSet = new ImportIDSet(indexEntryLimit, indexEntryLimit, maintainCount); this.state = state; this.trusted = state.getIndexTrustState(txn, this); @@ -156,6 +159,60 @@ } /** * Delete the specified import ID set from the import ID set associated with the key. * * @param txn The database transaction * @param key The key to delete the set from. * @param importIdSet The import ID set to delete. * @throws StorageRuntimeException If a database error occurs. */ void delete(WriteableStorage txn, ByteSequence key, ImportIDSet importIdSet) throws StorageRuntimeException { ByteString value = read(txn, key, false); if (value != null) { newImportIDSet.clear(); newImportIDSet.remove(value, importIdSet); if (newImportIDSet.isDefined() && newImportIDSet.size() == 0) { delete(txn, key); } else { value = newImportIDSet.valueToByteString(); put(txn, key, value); } } else { // Should never happen -- the keys should always be there. throw new RuntimeException(); } } /** * Insert the specified import ID set into this index. Creates a DB cursor if needed. * * @param txn The database transaction * @param key The key to add the set to. * @param importIdSet The set of import IDs. * @throws StorageRuntimeException If a database error occurs. */ void insert(WriteableStorage txn, ByteSequence key, ImportIDSet importIdSet) throws StorageRuntimeException { ByteString value = read(txn, key, false); if(value != null) { newImportIDSet.clear(); if (newImportIDSet.merge(value, importIdSet)) { entryLimitExceededCount++; } value = newImportIDSet.valueToByteString(); put(txn, key, value); } else { if(!importIdSet.isDefined()) { entryLimitExceededCount++; } value = importIdSet.valueToByteString(); put(txn, key, value); } } /** * Update the set of entry IDs for a given key. * * @param txn A database transaction, or null if none is required. opendj3-server-dev/src/server/org/opends/server/backends/pluggable/IndexInputBuffer.java
New file @@ -0,0 +1,387 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2010 Sun Microsystems, Inc. * Portions Copyright 2012-2015 ForgeRock AS. */ package org.opends.server.backends.pluggable; import static org.opends.messages.JebMessages.*; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.opends.server.backends.pluggable.Importer.IndexManager; import com.sleepycat.util.PackedInteger; /** * The buffer class is used to process a buffer from the temporary index files * during phase 2 processing. */ final class IndexInputBuffer implements Comparable<IndexInputBuffer> { private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private final IndexManager indexMgr; private final FileChannel channel; private final long begin; private final long end; private final int id; private long offset; private final ByteBuffer cache; private Integer indexID; private ByteBuffer keyBuf = ByteBuffer.allocate(128); /** Possible states while reading a record. */ private static enum RecordState { START, NEED_INSERT_ID_SET, NEED_DELETE_ID_SET } private RecordState recordState = RecordState.START; /** * Creates a new index input buffer. * * @param indexMgr * The index manager. * @param channel * The index file channel. * @param begin * The position of the start of the buffer in the scratch file. * @param end * The position of the end of the buffer in the scratch file. * @param id * The index ID. * @param cacheSize * The cache size. * @throws IOException * If an IO error occurred when priming the cache. */ public IndexInputBuffer(IndexManager indexMgr, FileChannel channel, long begin, long end, int id, int cacheSize) throws IOException { this.indexMgr = indexMgr; this.channel = channel; this.begin = begin; this.end = end; this.offset = 0; this.id = id; this.cache = ByteBuffer.allocate(Math.max(cacheSize - 384, 256)); loadCache(); cache.flip(); keyBuf.flip(); } private void loadCache() throws IOException { channel.position(begin + offset); long leftToRead = end - (begin + offset); long bytesToRead; if (leftToRead < cache.remaining()) { cache.limit((int) (cache.position() + leftToRead)); bytesToRead = (int) leftToRead; } else { bytesToRead = Math.min((end - offset), cache.remaining()); } int bytesRead = 0; while (bytesRead < bytesToRead) { bytesRead += channel.read(cache); } offset += bytesRead; indexMgr.addBytesRead(bytesRead); } /** * Returns {@code true} if this buffer has more data. * * @return {@code true} if this buffer has more data. * @throws IOException * If an IO error occurred. */ public boolean hasMoreData() throws IOException { boolean ret = ((begin + offset) >= end); return !(cache.remaining() == 0 && ret); } /** * Returns the length of the next key. * * @return The length of the next key. */ public int getKeyLen() { return keyBuf.limit(); } /** * Returns the next key. * * @param b * A buffer into which the key should be added. */ public void getKey(ByteBuffer b) { keyBuf.get(b.array(), 0, keyBuf.limit()); b.limit(keyBuf.limit()); } /** * Returns the index ID of the next record. * * @return The index ID of the next record. */ public Integer getIndexID() { if (indexID == null) { try { getNextRecord(); } catch (IOException ex) { logger.error(ERR_JEB_IMPORT_BUFFER_IO_ERROR, indexMgr .getBufferFileName()); throw new RuntimeException(ex); } } return indexID; } /** * Reads the next record from the buffer, skipping any remaining data in the * current record. * * @throws IOException * If an IO error occurred. */ public void getNextRecord() throws IOException { switch (recordState) { case START: // Nothing to skip. break; case NEED_INSERT_ID_SET: // The previous record's ID sets were not read, so skip them both. mergeIDSet(null); mergeIDSet(null); break; case NEED_DELETE_ID_SET: // The previous record's delete ID set was not read, so skip it. mergeIDSet(null); break; } indexID = getInt(); ensureData(20); byte[] ba = cache.array(); int p = cache.position(); int len = PackedInteger.getReadIntLength(ba, p); int keyLen = PackedInteger.readInt(ba, p); cache.position(p + len); if (keyLen > keyBuf.capacity()) { keyBuf = ByteBuffer.allocate(keyLen); } ensureData(keyLen); keyBuf.clear(); cache.get(keyBuf.array(), 0, keyLen); keyBuf.limit(keyLen); recordState = RecordState.NEED_INSERT_ID_SET; } private int getInt() throws IOException { ensureData(4); return cache.getInt(); } /** * Reads the next ID set from the record and merges it with the provided ID * set. * * @param idSet * The ID set to be merged. * @throws IOException * If an IO error occurred. */ public void mergeIDSet(ImportIDSet idSet) throws IOException { if (recordState == RecordState.START) { throw new IllegalStateException(); } ensureData(20); int p = cache.position(); byte[] ba = cache.array(); int len = PackedInteger.getReadIntLength(ba, p); int keyCount = PackedInteger.readInt(ba, p); p += len; cache.position(p); for (int k = 0; k < keyCount; k++) { if (ensureData(9)) { p = cache.position(); } len = PackedInteger.getReadLongLength(ba, p); long l = PackedInteger.readLong(ba, p); p += len; cache.position(p); // idSet will be null if skipping. if (idSet != null) { idSet.addEntryID(l); } } switch (recordState) { case START: throw new IllegalStateException(); case NEED_INSERT_ID_SET: recordState = RecordState.NEED_DELETE_ID_SET; break; case NEED_DELETE_ID_SET: recordState = RecordState.START; break; } } private boolean ensureData(int len) throws IOException { boolean ret = false; if (cache.remaining() == 0) { cache.clear(); loadCache(); cache.flip(); ret = true; } else if (cache.remaining() < len) { cache.compact(); loadCache(); cache.flip(); ret = true; } return ret; } /** * Compares this buffer with the provided key and index ID. * * @param cKey * The key. * @param cIndexID * The index ID. * @return A negative number if this buffer is less than the provided key and * index ID, a positive number if this buffer is greater, or zero if * it is the same. */ int compare(ByteBuffer cKey, Integer cIndexID) { if (keyBuf.limit() == 0) { getIndexID(); } final int rc = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), cKey.array(), cKey.limit()); if (rc == 0) { return (indexID.intValue() == cIndexID.intValue()) ? 0 : 1; } return 1; } /** {@inheritDoc} */ @Override public int compareTo(IndexInputBuffer o) { // used in remove. if (this == o) { return 0; } if (keyBuf.limit() == 0) { getIndexID(); } if (o.keyBuf.limit() == 0) { o.getIndexID(); } byte[] oKey = o.keyBuf.array(); int oLen = o.keyBuf.limit(); int returnCode = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), oKey, oLen); if (returnCode == 0) { returnCode = indexID.intValue() - o.getIndexID().intValue(); if (returnCode == 0) { returnCode = id - o.id; } } return returnCode; } } opendj3-server-dev/src/server/org/opends/server/backends/pluggable/IndexOutputBuffer.java
New file @@ -0,0 +1,996 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2013-2015 ForgeRock AS. */ package org.opends.server.backends.pluggable; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.forgerock.opendj.ldap.ByteSequence; import com.sleepycat.util.PackedInteger; /** * This class represents a index buffer used to store the keys and entry IDs * processed from the LDIF file during phase one of an import, or rebuild index * process. Each key and ID is stored in a record in the buffer. * <p> * The records in the buffer are eventually sorted, based on the key, when the * maximum size of the buffer is reached and no more records will fit into the * buffer. The buffer is scheduled to be flushed to an index scratch file and * then re-cycled by the import, or rebuild-index process. * </p> * <p> * The structure of a record in the buffer is the following: * * <pre> * +-------------+-------------+---------+---------+------------+-----------+ * | record size | INS/DEL bit | indexID | entryID | key length | key bytes | * +-------------+-------------+---------+---------+------------+-----------+ * </pre> * * The record size is used for fast seeks to quickly "jump" over records. * </p> * <p> * The records are packed as much as possible, to optimize the buffer space.<br> * This class is not thread safe. * </p> */ final class IndexOutputBuffer implements Comparable<IndexOutputBuffer> { /** Enumeration used when sorting a buffer. */ private enum CompareOp { LT, GT, LE, GE, EQ } /** The size of a Java int. A Java int is 32 bits, i.e. 4 bytes. */ static final int INT_SIZE = 4; /** * The record overhead. In addition to entryID, key length and key bytes, the * record overhead includes the indexID + INS/DEL bit */ private static final int REC_OVERHEAD = INT_SIZE + 1; /** Buffer records are either insert records or delete records. */ private static final byte DEL = 0, INS = 1; /** The size of a buffer. */ private final int size; /** Byte array holding the actual buffer data. */ private final byte buffer[]; /** * Used to break a tie (keys equal) when the buffers are being merged * for writing to the index scratch file. */ private long id; /** OffSet where next key is written. */ private int keyOffset; /** OffSet where next value record is written. */ private int recordOffset; /** Amount of bytes left in the buffer. */ private int bytesLeft; /** Number of keys in the buffer. */ private int keys; /** Used to iterate over the buffer when writing to a scratch file. */ private int position; /** The comparator to use sort the keys. */ private ComparatorBuffer<byte[]> comparator; /** * Used to make sure that an instance of this class is put on the * correct scratch file writer work queue for processing. */ private Importer.IndexKey indexKey; /** Initial capacity of re-usable buffer used in key compares. */ private static final int CAP = 32; /** * This buffer is reused during key compares. It's main purpose is to keep * memory footprint as small as possible. */ private ByteBuffer keyBuffer = ByteBuffer.allocate(CAP); /** * Set to {@code true} if the buffer should not be recycled. Used when the * importer/rebuild index process is doing phase one cleanup and flushing * buffers not completed. */ private boolean discarded; /** * Create an instance of a IndexBuffer using the specified size. * * @param size The size of the underlying byte array. */ public IndexOutputBuffer(int size) { this.size = size; this.buffer = new byte[size]; this.bytesLeft = size; this.recordOffset = size - 1; } /** * Reset an IndexBuffer so it can be re-cycled. */ public void reset() { bytesLeft = size; keyOffset = 0; recordOffset = size - 1; keys = 0; position = 0; comparator = null; indexKey = null; } /** * Creates a new poison buffer. Poison buffers are used to stop the processing of import tasks. * * @return a new poison buffer */ public static IndexOutputBuffer poison() { return new IndexOutputBuffer(0); } /** * Set the ID of a buffer to the specified value. * * @param id The value to set the ID to. */ public void setID(long id) { this.id = id; } /** * Return the ID of a buffer. * * @return The value of a buffer's ID. */ private long getBufferID() { return this.id; } /** * Determines if a buffer is a poison buffer. A poison buffer is used to * shutdown work queues when import/rebuild index phase one is completed. * A poison buffer has a 0 size. * * @return {@code true} if a buffer is a poison buffer, or {@code false} * otherwise. */ public boolean isPoison() { return size == 0; } /** * Determines if buffer should be re-cycled by calling {@link #reset()}. * * @return {@code true} if buffer should be recycled, or {@code false} if it * should not. */ public boolean isDiscarded() { return discarded; } /** * Sets the discarded flag to {@code true}. */ public void discard() { discarded = true; } /** * Returns {@code true} if there is enough space available to write the * specified byte array in the buffer. It returns {@code false} otherwise. * * @param kBytes The byte array to check space against. * @param id The id value to check space against. * @return {@code true} if there is space to write the byte array in a * buffer, or {@code false} otherwise. */ public boolean isSpaceAvailable(ByteSequence kBytes, long id) { return getRequiredSize(kBytes.length(), id) < bytesLeft; } /** * Set the comparator to be used in the buffer processing to the specified * comparator. * * @param comparator The comparator to set the buffer's comparator to. */ public void setComparator(ComparatorBuffer<byte[]> comparator) { this.comparator = comparator; } /** * Return a buffer's current position value. * * @return The buffer's current position value. */ public int getPosition() { return position; } /** * Set a buffer's position value to the specified position. * * @param position The value to set the position to. */ public void setPosition(int position) { this.position = position; } /** * Sort the buffer. */ public void sort() { sort(0, keys); } /** * Add the specified key byte array and EntryID to the buffer. * * @param keyBytes The key byte array. * @param entryID The EntryID. * @param indexID The index ID the record belongs. * @param insert <CODE>True</CODE> if key is an insert, false otherwise. */ public void add(ByteSequence keyBytes, EntryID entryID, int indexID, boolean insert) { // write the record data, but leave the space to write the record size just // before it recordOffset = addRecord(keyBytes, entryID.longValue(), indexID, insert); // then write the returned record size writeIntBytes(recordOffset, buffer, keyOffset); keyOffset += INT_SIZE; bytesLeft = recordOffset - keyOffset; keys++; } /** * Writes the full record minus the record size itself. */ private int addRecord(ByteSequence key, long id, int indexID, boolean insert) { int retOffset = recordOffset - getRecordSize(key.length(), id); int offSet = retOffset; // write the INS/DEL bit buffer[offSet++] = insert ? INS : DEL; // write the indexID writeIntBytes(indexID, buffer, offSet); offSet += INT_SIZE; // write the entryID offSet = PackedInteger.writeLong(buffer, offSet, id); // write the key length offSet = PackedInteger.writeInt(buffer, offSet, key.length()); // write the key bytes key.copyTo(buffer, offSet); return retOffset; } /** * Computes the full size of the record. * * @param keyLen The length of the key of index * @param id The entry id * @return The size that such record would take in the IndexOutputBuffer */ public static int getRequiredSize(int keyLen, long id) { // Adds up the key length + key bytes + entryID + indexID + the INS/DEL bit // and finally the space needed to store the record size return getRecordSize(keyLen, id) + INT_SIZE; } private static int getRecordSize(int keyLen, long id) { // Adds up the key length + key bytes + ... return PackedInteger.getWriteIntLength(keyLen) + keyLen + // ... entryID + (indexID + INS/DEL bit). PackedInteger.getWriteLongLength(id) + REC_OVERHEAD; } /** * Write record at specified index to the specified output stream. Used when * when writing the index scratch files. * @param stream The stream to write the record at the index to. * @param index The index of the record to write. */ public void writeID(ByteArrayOutputStream stream, int index) { int offSet = getIntegerValue(index * INT_SIZE); int len = PackedInteger.getReadLongLength(buffer, offSet + REC_OVERHEAD); stream.write(buffer, offSet + REC_OVERHEAD, len); } /** * Return {@code true} if the record specified by the index is an insert * record, or {@code false} if it a delete record. * * @param index The index of the record. * * @return {@code true} if the record is an insert record, or {@code false} * if it is a delete record. */ public boolean isInsertRecord(int index) { int recOffset = getIntegerValue(index * INT_SIZE); return buffer[recOffset] != DEL; } /** * Return the size of the key part of the record. * * @return The size of the key part of the record. */ public int getKeySize() { int offSet = getIntegerValue(position * INT_SIZE) + REC_OVERHEAD; offSet += PackedInteger.getReadIntLength(buffer, offSet); return PackedInteger.readInt(buffer, offSet); } /** * Return the key value part of a record indicated by the current buffer * position. * * @return byte array containing the key value. */ public byte[] getKey() { return getKey(position); } /** Used to minimized memory usage when comparing keys. */ private ByteBuffer getKeyBuf(int x) { keyBuffer.clear(); int offSet = getIntegerValue(x * INT_SIZE) + REC_OVERHEAD; offSet += PackedInteger.getReadIntLength(buffer, offSet); int keyLen = PackedInteger.readInt(buffer, offSet); offSet += PackedInteger.getReadIntLength(buffer, offSet); //Re-allocate if the key is bigger than the capacity. if(keyLen > keyBuffer.capacity()) { keyBuffer = ByteBuffer.allocate(keyLen); } keyBuffer.put(buffer, offSet, keyLen); keyBuffer.flip(); return keyBuffer; } /** * Return the key value part of a record specified by the index. * * @param x index to return. * @return byte array containing the key value. */ private byte[] getKey(int x) { int offSet = getIntegerValue(x * INT_SIZE) + REC_OVERHEAD; offSet += PackedInteger.getReadIntLength(buffer, offSet); int keyLen = PackedInteger.readInt(buffer, offSet); offSet += PackedInteger.getReadIntLength(buffer, offSet); byte[] key = new byte[keyLen]; System.arraycopy(buffer, offSet, key, 0, keyLen); return key; } private int getIndexID(int x) { return getIntegerValue(getIntegerValue(x * INT_SIZE) + 1); } /** * Return index id associated with the current position's record. * * @return The index id. */ public int getIndexID() { return getIntegerValue(getIntegerValue(position * INT_SIZE) + 1); } private boolean is(int x, int y, CompareOp op) { int xoffSet = getIntegerValue(x * INT_SIZE); int xIndexID = getIntegerValue(xoffSet + 1); xoffSet += REC_OVERHEAD; xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet); int xKeyLen = PackedInteger.readInt(buffer, xoffSet); int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet; int yoffSet = getIntegerValue(y * INT_SIZE); int yIndexID = getIntegerValue(yoffSet + 1); yoffSet += REC_OVERHEAD; yoffSet += PackedInteger.getReadIntLength(buffer, yoffSet); int yKeyLen = PackedInteger.readInt(buffer, yoffSet); int yKey = PackedInteger.getReadIntLength(buffer, yoffSet) + yoffSet; return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen, xIndexID, yKey, yKeyLen, yIndexID), op); } private boolean is(int x, byte[] yKey, CompareOp op, int yIndexID) { int xoffSet = getIntegerValue(x * INT_SIZE); int xIndexID = getIntegerValue(xoffSet + 1); xoffSet += REC_OVERHEAD; xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet); int xKeyLen = PackedInteger.readInt(buffer, xoffSet); int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet; return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen, xIndexID, yKey, yKey.length, yIndexID), op); } /** * Compare the byte array at the current position with the specified one and * using the specified index id. It will return {@code true} if the byte * array at the current position is equal to the specified byte array as * determined by the comparator and the index ID is is equal. It will * return {@code false} otherwise. * * @param b The byte array to compare. * @param bIndexID The index key. * @return <CODE>True</CODE> if the byte arrays are equal. */ public boolean compare(byte[]b, int bIndexID) { int offset = getIntegerValue(position * INT_SIZE); int indexID = getIntegerValue(offset + 1); offset += REC_OVERHEAD; offset += PackedInteger.getReadIntLength(buffer, offset); int keyLen = PackedInteger.readInt(buffer, offset); int key = PackedInteger.getReadIntLength(buffer, offset) + offset; return comparator.compare(buffer, key, keyLen, b, b.length) == 0 && indexID == bIndexID; } /** * Compare current IndexBuffer to the specified index buffer using both the * comparator and index ID of both buffers. * * The key at the value of position in both buffers are used in the compare. * * @param b The IndexBuffer to compare to. * @return 0 if the buffers are equal, -1 if the current buffer is less * than the specified buffer, or 1 if it is greater. */ @Override public int compareTo(IndexOutputBuffer b) { final ByteBuffer keyBuf = b.getKeyBuf(b.position); int offset = getIntegerValue(position * INT_SIZE); int indexID = getIntegerValue(offset + 1); offset += REC_OVERHEAD; offset += PackedInteger.getReadIntLength(buffer, offset); int keyLen = PackedInteger.readInt(buffer, offset); int key = PackedInteger.getReadIntLength(buffer, offset) + offset; final int cmp = comparator.compare(buffer, key, keyLen, keyBuf.array(), keyBuf.limit()); if (cmp != 0) { return cmp; } final int bIndexID = b.getIndexID(); if (indexID == bIndexID) { // This is tested in a tree set remove when a buffer is removed from the tree set. return compare(this.id, b.getBufferID()); } else if (indexID < bIndexID) { return -1; } else { return 1; } } private int compare(long l1, long l2) { if (l1 == l2) { return 0; } else if (l1 < l2) { return -1; } else { return 1; } } /** * Write a record to specified output stream using the record pointed to by * the current position and the specified byte stream of ids. * * @param dataStream The data output stream to write to. * * @throws IOException If an I/O error occurs writing the record. */ public void writeKey(DataOutputStream dataStream) throws IOException { int offSet = getIntegerValue(position * INT_SIZE) + REC_OVERHEAD; offSet += PackedInteger.getReadIntLength(buffer, offSet); int keyLen = PackedInteger.readInt(buffer, offSet); offSet += PackedInteger.getReadIntLength(buffer, offSet); dataStream.write(buffer, offSet, keyLen); } /** * Compare the byte array at the current position with the byte array at the * specified index. * * @param i The index pointing to the byte array to compare. * @return {@code true} if the byte arrays are equal, or {@code false} otherwise */ public boolean compare(int i) { return is(i, position, CompareOp.EQ); } /** * Return the current number of keys. * * @return The number of keys currently in an index buffer. */ public int getNumberKeys() { return keys; } /** * Return {@code true} if the buffer has more data to process, or * {@code false} otherwise. Used when iterating over the buffer writing the * scratch index file. * * @return {@code true} if a buffer has more data to process, or * {@code false} otherwise. */ public boolean hasMoreData() { return position + 1 < keys; } /** * Advance the position pointer to the next record in the buffer. Used when * iterating over the buffer examining keys. */ public void nextRecord() { position++; } private void writeIntBytes(int val, byte[] b, int offset) { for (int i = offset + 3; i >= 0; i--) { b[i] = (byte) (val & 0xff); val >>>= 8; } } private int getIntegerValue(int index) { int answer = 0; for (int i = 0; i < INT_SIZE; i++) { byte b = buffer[index + i]; answer <<= 8; answer |= (b & 0xff); } return answer; } private int med3(int a, int b, int c) { return is(a, b, CompareOp.LT) ? (is(b,c,CompareOp.LT) ? b : is(a,c,CompareOp.LT) ? c : a) : (is(b,c,CompareOp.GT) ? b : is(a,c,CompareOp.GT) ? c : a); } private void sort(int off, int len) { if (len < 7) { for (int i=off; i<len+off; i++) { for (int j=i; j>off && is(j-1, j, CompareOp.GT); j--) { swap(j, j-1); } } return; } int m = off + (len >> 1); if (len > 7) { int l = off; int n = off + len - 1; if (len > 40) { int s = len/8; l = med3(l, l+s, l+2*s); m = med3(m-s, m, m+s); n = med3(n-2*s, n-s, n); } m = med3(l, m, n); } byte[] mKey = getKey(m); int mIndexID = getIndexID(m); int a = off, b = a, c = off + len - 1, d = c; while(true) { while (b <= c && is(b, mKey, CompareOp.LE, mIndexID)) { if (is(b, mKey, CompareOp.EQ, mIndexID)) { swap(a++, b); } b++; } while (c >= b && is(c, mKey, CompareOp.GE, mIndexID)) { if (is(c, mKey, CompareOp.EQ, mIndexID)) { swap(c, d--); } c--; } if (b > c) { break; } swap(b++, c--); } // Swap partition elements back to middle int s, n = off + len; s = Math.min(a-off, b-a ); vectorSwap(off, b-s, s); s = Math.min(d-c, n-d-1); vectorSwap(b, n-s, s); s = b - a; // Recursively sort non-partition-elements if (s > 1) { sort(off, s); } s = d - c; if (s > 1) { sort(n-s, s); } } private void swap(int a, int b) { int aOffset = a * INT_SIZE; int bOffset = b * INT_SIZE; int bVal = getIntegerValue(bOffset); System.arraycopy(buffer, aOffset, buffer, bOffset, INT_SIZE); writeIntBytes(bVal, buffer, aOffset); } private void vectorSwap(int a, int b, int n) { for (int i=0; i<n; i++, a++, b++) { swap(a, b); } } private boolean evaluateReturnCode(int rc, CompareOp op) { switch(op) { case LT: return rc < 0; case GT: return rc > 0; case LE: return rc <= 0; case GE: return rc >= 0; case EQ: return rc == 0; default: return false; } } /** * Interface that defines two methods used to compare keys used in this * class. The Comparator interface cannot be used in this class, so this * special one is used that knows about the special properties of this class. * * @param <T> object to use in the compare */ public static interface ComparatorBuffer<T> { /** * Compare two offsets in an object, usually a byte array. * * @param o The object. * @param offset The first offset. * @param length The first length. * @param indexID The first index id. * @param otherOffset The second offset. * @param otherLength The second length. * @param otherIndexID The second index id. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second. */ int compare(T o, int offset, int length, int indexID, int otherOffset, int otherLength, int otherIndexID); /** * Compare an offset in an object with the specified object. * * @param o The first object. * @param offset The first offset. * @param length The first length. * @param indexID The first index id. * @param other The second object. * @param otherLength The length of the second object. * @param otherIndexID The second index id. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second * object. */ int compare(T o, int offset, int length, int indexID, T other, int otherLength, int otherIndexID); /** * Compare an offset in an object with the specified object. * * @param o The first object. * @param offset The first offset. * @param length The first length. * @param other The second object. * @param otherLen The length of the second object. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second * object. */ int compare(T o, int offset, int length, T other, int otherLen); } /** * Implementation of ComparatorBuffer interface. Used to compare keys when * they are non-DN indexes. */ public static class IndexComparator implements IndexOutputBuffer.ComparatorBuffer<byte[]> { /** * Compare two offsets in an byte array using the index compare * algorithm. The specified index ID is used in the comparison if the * byte arrays are equal. * * @param b The byte array. * @param offset The first offset. * @param length The first length. * @param indexID The first index id. * @param otherOffset The second offset. * @param otherLength The second length. * @param otherIndexID The second index id. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second. */ @Override public int compare(byte[] b, int offset, int length, int indexID, int otherOffset, int otherLength, int otherIndexID) { for(int i = 0; i < length && i < otherLength; i++) { if(b[offset + i] > b[otherOffset + i]) { return 1; } else if (b[offset + i] < b[otherOffset + i]) { return -1; } } return compareLengthThenIndexID(length, indexID, otherLength, otherIndexID); } /** * Compare an offset in an byte array with the specified byte array, * using the DN compare algorithm. The specified index ID is used in the * comparison if the byte arrays are equal. * * @param b The byte array. * @param offset The first offset. * @param length The first length. * @param indexID The first index id. * @param other The second byte array to compare to. * @param otherLength The second byte array's length. * @param otherIndexID The second index id. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second * byte array. */ @Override public int compare(byte[] b, int offset, int length, int indexID, byte[] other, int otherLength, int otherIndexID) { for(int i = 0; i < length && i < otherLength; i++) { if(b[offset + i] > other[i]) { return 1; } else if (b[offset + i] < other[i]) { return -1; } } return compareLengthThenIndexID(length, indexID, otherLength, otherIndexID); } /** * The arrays are equal, make sure they are in the same index * since multiple suffixes might have the same key. */ private int compareLengthThenIndexID(int length, int indexID, int otherLength, int otherIndexID) { if (length == otherLength) { return compare(indexID, otherIndexID); } else if (length > otherLength) { return 1; } else { return -1; } } /** * Compare an offset in an byte array with the specified byte array, * using the DN compare algorithm. * * @param b The byte array. * @param offset The first offset. * @param length The first length. * @param other The second byte array to compare to. * @param otherLength The second byte array's length. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second * byte array. */ @Override public int compare(byte[] b, int offset, int length, byte[] other, int otherLength) { for(int i = 0; i < length && i < otherLength; i++) { if(b[offset + i] > other[i]) { return 1; } else if (b[offset + i] < other[i]) { return -1; } } return compare(length, otherLength); } private int compare(int i1, int i2) { if (i1 == i2) { return 0; } else if (i1 > i2) { return 1; } else { return -1; } } } /** * Set the index key associated with an index buffer. * * @param indexKey The index key. */ public void setIndexKey(Importer.IndexKey indexKey) { this.indexKey = indexKey; } /** * Return the index key of an index buffer. * @return The index buffer's index key. */ public Importer.IndexKey getIndexKey() { return indexKey; } } opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java
@@ -310,7 +310,12 @@ timerService.awaitTermination(20, TimeUnit.SECONDS); } private void removeFiles() throws StorageRuntimeException /** * Removes all the files from the rootContainer's directory. * * @throws StorageRuntimeException If a problem occurred */ void removeFiles() throws StorageRuntimeException { if (!backendDirectory.isDirectory()) { opendj3-server-dev/src/server/org/opends/server/backends/pluggable/Suffix.java
New file @@ -0,0 +1,368 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2014-2015 ForgeRock AS */ package org.opends.server.backends.pluggable; import static org.opends.messages.JebMessages.*; import static org.opends.server.util.ServerConstants.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.opends.server.backends.pluggable.Importer.DNCache; import org.opends.server.backends.pluggable.spi.ReadableStorage; import org.opends.server.backends.pluggable.spi.StorageRuntimeException; import org.opends.server.types.AttributeType; import org.opends.server.types.DN; /** * The class represents a suffix that is to be loaded during an import, or * rebuild index process. Multiple instances of this class can be instantiated * during and import to support multiple suffixes in a backend. A rebuild * index has only one of these instances. */ class Suffix { private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private final List<DN> includeBranches, excludeBranches; private final DN baseDN; private final EntryContainer srcEntryContainer; private final EntryContainer entryContainer; private final Object synchObject = new Object(); private static final int PARENT_ID_SET_SIZE = 16 * 1024; private final ConcurrentHashMap<DN, CountDownLatch> pendingMap = new ConcurrentHashMap<DN, CountDownLatch>(); private final Set<DN> parentSet = new HashSet<DN>(PARENT_ID_SET_SIZE); private DN parentDN; /** * Creates a suffix instance using the specified parameters. * * @param entryContainer The entry container pertaining to the suffix. * @param srcEntryContainer The original entry container. * @param includeBranches The include branches. * @param excludeBranches The exclude branches. */ Suffix(EntryContainer entryContainer, EntryContainer srcEntryContainer, List<DN> includeBranches, List<DN> excludeBranches) { this.entryContainer = entryContainer; this.srcEntryContainer = srcEntryContainer; this.baseDN = entryContainer.getBaseDN(); if (includeBranches != null) { this.includeBranches = includeBranches; } else { this.includeBranches = new ArrayList<DN>(0); } if (excludeBranches != null) { this.excludeBranches = excludeBranches; } else { this.excludeBranches = new ArrayList<DN>(0); } } /** * Returns the DN2ID instance pertaining to a suffix instance. * * @return A DN2ID instance that can be used to manipulate the DN2ID database. */ public DN2ID getDN2ID() { return entryContainer.getDN2ID(); } /** * Returns the ID2Entry instance pertaining to a suffix instance. * * @return A ID2Entry instance that can be used to manipulate the ID2Entry * database. */ public ID2Entry getID2Entry() { return entryContainer.getID2Entry(); } /** * Returns the DN2URI instance pertaining to a suffix instance. * * @return A DN2URI instance that can be used to manipulate the DN2URI * database. */ public DN2URI getDN2URI() { return entryContainer.getDN2URI(); } /** * Returns the entry container pertaining to a suffix instance. * * @return The entry container used to create a suffix instance. */ public EntryContainer getEntryContainer() { return entryContainer; } /** * Return the Attribute Type - Index map used to map an attribute type to an * index instance. * * @return A suffixes Attribute Type - Index map. */ public Map<AttributeType, AttributeIndex> getAttrIndexMap() { return entryContainer.getAttributeIndexMap(); } /** * Make sure the specified parent DN is not in the pending map. * * @param parentDN The DN of the parent. */ private void assureNotPending(DN parentDN) throws InterruptedException { final CountDownLatch l = pendingMap.get(parentDN); if (l != null) { l.await(); } } /** * Add specified DN to the pending map. * * @param dn The DN to add to the map. */ public void addPending(DN dn) { pendingMap.putIfAbsent(dn, new CountDownLatch(1)); } /** * Remove the specified DN from the pending map, it may not exist if the * entries are being migrated so just return. * * @param dn The DN to remove from the map. */ public void removePending(DN dn) { CountDownLatch l = pendingMap.remove(dn); if(l != null) { l.countDown(); } } /** * Return {@code true} if the specified dn is contained in the parent set, or * in the specified DN cache. This would indicate that the parent has already * been processed. It returns {@code false} otherwise. * * It will optionally check the dn2id database for the dn if the specified * cleared backend boolean is {@code true}. * * @param txn The database transaction * @param dn The DN to check for. * @param dnCache The importer DN cache. * @param clearedBackend Set to {@code true} if the import process cleared the * backend before processing. * @return {@code true} if the dn is contained in the parent ID, or * {@code false} otherwise. * * @throws StorageRuntimeException If an error occurred searching the DN cache, or * dn2id database. * @throws InterruptedException If an error occurred processing the pending * map. */ public boolean isParentProcessed(ReadableStorage txn, DN dn, DNCache dnCache, boolean clearedBackend) throws StorageRuntimeException, InterruptedException { synchronized(synchObject) { if(parentSet.contains(dn)) { return true; } } //The DN was not in the parent set. Make sure it isn't pending. try { assureNotPending(dn); } catch (InterruptedException e) { logger.error(ERR_JEB_IMPORT_LDIF_PENDING_ERR, e.getMessage()); throw e; } // Either parent is in the DN cache, // or else check the dn2id database for the DN (only if backend wasn't cleared) final boolean parentThere = dnCache.contains(dn) || (!clearedBackend && getDN2ID().get(txn, dn, false) != null); //Add the DN to the parent set if needed. if (parentThere) { synchronized(synchObject) { if (parentSet.size() >= PARENT_ID_SET_SIZE) { Iterator<DN> iterator = parentSet.iterator(); iterator.next(); iterator.remove(); } parentSet.add(dn); } } return parentThere; } /** * Sets the trusted status of all of the indexes, vlvIndexes, id2children * and id2subtree indexes. * * @param trusted True if the indexes should be trusted or false * otherwise. * * @throws StorageRuntimeException If an error occurred setting the indexes to * trusted. */ public void setIndexesTrusted(boolean trusted) throws StorageRuntimeException { entryContainer.getID2Children().setTrusted(null,trusted); entryContainer.getID2Subtree().setTrusted(null, trusted); for (AttributeIndex attributeIndex : entryContainer.getAttributeIndexes()) { setTrusted(attributeIndex.getEqualityIndex(), trusted); setTrusted(attributeIndex.getPresenceIndex(), trusted); setTrusted(attributeIndex.getSubstringIndex(), trusted); setTrusted(attributeIndex.getOrderingIndex(), trusted); setTrusted(attributeIndex.getApproximateIndex(), trusted); Map<String, Collection<Index>> exIndexes = attributeIndex.getExtensibleIndexes(); if(!exIndexes.isEmpty()) { setTrusted(exIndexes.get(EXTENSIBLE_INDEXER_ID_SUBSTRING), trusted); setTrusted(exIndexes.get(EXTENSIBLE_INDEXER_ID_SHARED), trusted); } } for(VLVIndex vlvIdx : entryContainer.getVLVIndexes()) { vlvIdx.setTrusted(null, trusted); } } private void setTrusted(Index index, boolean trusted) { if (index != null) { index.setTrusted(null, trusted); } } private void setTrusted(Collection<Index> subIndexes, boolean trusted) { if (subIndexes != null) { for (Index subIndex : subIndexes) { subIndex.setTrusted(null, trusted); } } } /** * Get the parent DN of the last entry added to a suffix. * * @return The parent DN of the last entry added. */ public DN getParentDN() { return parentDN; } /** * Set the parent DN of the last entry added to a suffix. * * @param parentDN The parent DN to save. */ public void setParentDN(DN parentDN) { this.parentDN = parentDN; } /** * Return a src entry container. * * @return The src entry container. */ public EntryContainer getSrcEntryContainer() { return this.srcEntryContainer; } /** * Return include branches. * * @return The include branches. */ public List<DN> getIncludeBranches() { return this.includeBranches; } /** * Return exclude branches. * * @return the exclude branches. */ public List<DN> getExcludeBranches() { return this.excludeBranches; } /** * Return base DN. * * @return The base DN. */ public DN getBaseDN() { return this.baseDN; } }