/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.backends.jeb; import org.opends.server.protocols.asn1.ASN1OctetString; import org.opends.server.types.Entry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Transaction; import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.io.ByteArrayOutputStream; import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FilenameFilter; import java.io.FileOutputStream; import java.io.IOException; /** * This class is used to create an attribute index for an import process. * It is used as follows. *
 * startProcessing();
 * processEntry(entry);
 * processEntry(entry);
 * ...
 * stopProcessing();
 * merge();
 * 
*/ public class IndexBuilder { /** * The import context. */ private ImportContext importContext; /** * The index database. */ private Index index; /** * The indexer to generate the index keys. */ private Indexer indexer; /** * The write buffer. */ ArrayList buffer; /** * The write buffer size. */ private int bufferSize; /** * Current output file number. */ private int fileNumber = 0; /** * The index entry limit. */ private int entryLimit; /** * A unique prefix for temporary files to prevent conflicts. */ private String fileNamePrefix; /** * Indicates whether we are replacing existing data or not. */ private boolean replaceExisting = false; private ByteArrayOutputStream addBytesStream = new ByteArrayOutputStream(); private ByteArrayOutputStream delBytesStream = new ByteArrayOutputStream(); private DataOutputStream addBytesDataStream; private DataOutputStream delBytesDataStream; /** * A file name filter to identify temporary files we have written. */ private FilenameFilter filter = new FilenameFilter() { public boolean accept(File d, String name) { return name.startsWith(fileNamePrefix); } }; /** * Construct an index builder. * * @param importContext The import context. * @param index The index database we are writing. * @param entryLimit The index entry limit. * @param bufferSize The amount of memory available for buffering. */ public IndexBuilder(ImportContext importContext, Index index, int entryLimit, long bufferSize) { this.importContext = importContext; this.index = index; this.indexer = index.indexer; this.entryLimit = entryLimit; this.bufferSize = (int)bufferSize/100; long tid = Thread.currentThread().getId(); fileNamePrefix = importContext.getContainerName() + "_" + indexer.toString() + "_" + tid + "_"; replaceExisting = importContext.getLDIFImportConfig().appendToExistingData() && importContext.getLDIFImportConfig().replaceExistingEntries(); addBytesDataStream = new DataOutputStream(addBytesStream); delBytesDataStream = new DataOutputStream(delBytesStream); } /** * This method must be called before this object can process any * entries. It cleans up any temporary files left over from a * previous import. */ public void startProcessing() { // Clean up any work files left over from a previous run. File tempDir = new File(importContext.getConfig().getImportTempDirectory()); File[] files = tempDir.listFiles(filter); if (files != null) { for (File f : files) { f.delete(); } } buffer = new ArrayList(bufferSize); } /** * Indicates that the index thread should process the provided entry. * @param oldEntry The existing contents of the entry, or null if this is * a new entry. * @param newEntry The new contents of the entry. * @param entryID The entry ID. * @throws DatabaseException If an error occurs in the JE database. * @throws IOException If an I/O error occurs while writing an intermediate * file. */ public void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID) throws DatabaseException, IOException { Transaction txn = null; // Update the index for this entry. if (oldEntry != null) { // This is an entry being replaced. Set addKeys = new HashSet(); Set delKeys = new HashSet(); indexer.replaceEntry(txn, oldEntry, newEntry, addKeys, delKeys); for (ASN1OctetString k : delKeys) { removeID(k.value(), entryID); } for (ASN1OctetString k : addKeys) { insertID(k.value(), entryID); } } else { // This is a new entry. Set addKeys = new HashSet(); indexer.indexEntry(txn, newEntry, addKeys); for (ASN1OctetString k : addKeys) { insertID(k.value(), entryID); } } } /** * Indicates that there will be no more updates. * @throws IOException If an I/O error occurs while writing an intermediate * file. */ public void stopProcessing() throws IOException { flushBuffer(); } /** * Get a statistic of the number of keys that reached the entry limit. * * @return The number of keys that reached the entry limit. */ public int getEntryLimitExceededCount() { return index.getEntryLimitExceededCount(); } /** * Record the insertion of an entry ID. * @param key The index key. * @param entryID The entry ID. * @throws IOException If an I/O error occurs while writing an intermediate * file. */ private void insertID(byte[] key, EntryID entryID) throws IOException { if (buffer.size() >= bufferSize) { flushBuffer(); } IndexMod kav = new IndexMod(key, entryID, false); buffer.add(kav); } /** * Record the deletion of an entry ID. * @param key The index key. * @param entryID The entry ID. * @throws IOException If an I/O error occurs while writing an intermediate * file. */ private void removeID(byte[] key, EntryID entryID) throws IOException { if (buffer.size() >= bufferSize) { flushBuffer(); } IndexMod kav = new IndexMod(key, entryID, true); buffer.add(kav); } /** * Called when the buffer is full. It first sorts the buffer using the same * key comparator used by the index database. Then it merges all the * IDs for the same key together and writes each key and its list of IDs * to an intermediate binary file. * A list of deleted IDs is only present if we are replacing existing entries. * * @throws IOException If an I/O error occurs while writing an intermediate * file. */ private void flushBuffer() throws IOException { if (buffer.size() == 0) { return; } // Keys must be sorted before we can merge duplicates. IndexModComparator comparator; if (replaceExisting) { // The entry IDs may be out of order. // We must sort by key and ID. comparator = new IndexModComparator(indexer.getComparator(), true); } else { // The entry IDs are all new and are therefore already ordered. // We just need to sort by key. comparator = new IndexModComparator(indexer.getComparator(), false); } Collections.sort(buffer, comparator); // Start a new file. fileNumber++; String fileName = fileNamePrefix + String.valueOf(fileNumber); File file = new File(importContext.getConfig().getImportTempDirectory(), fileName); BufferedOutputStream bufferedStream = new BufferedOutputStream(new FileOutputStream(file)); DataOutputStream dataStream = new DataOutputStream(bufferedStream); // Reset the byte array output streams but preserve the underlying arrays. addBytesStream.reset(); delBytesStream.reset(); try { byte[] currentKey = null; for (IndexMod key : buffer) { byte[] keyString = key.key; if (!Arrays.equals(keyString,currentKey)) { if (currentKey != null) { dataStream.writeInt(currentKey.length); dataStream.write(currentKey); dataStream.writeInt(addBytesStream.size()); addBytesStream.writeTo(dataStream); if (replaceExisting) { dataStream.writeInt(delBytesStream.size()); delBytesStream.writeTo(dataStream); } } currentKey = keyString; addBytesStream.reset(); delBytesStream.reset(); } if (key.isDelete) { delBytesDataStream.writeLong(key.value.longValue()); } else { addBytesDataStream.writeLong(key.value.longValue()); } } if (currentKey != null) { dataStream.writeInt(currentKey.length); dataStream.write(currentKey); dataStream.writeInt(addBytesStream.size()); addBytesStream.writeTo(dataStream); if (replaceExisting) { dataStream.writeInt(delBytesStream.size()); delBytesStream.writeTo(dataStream); } } buffer = new ArrayList(bufferSize); } finally { dataStream.close(); } } /** * Get a string that identifies this index builder. * * @return A string that identifies this index builder. */ public String toString() { return indexer.toString(); } }