/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.backends.jeb; import static org.opends.server.util.StaticUtils.getFileForPath; import org.opends.server.types.*; import java.util.*; import java.io.*; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Transaction; /** * This class is used to create an VLV vlvIndex for an import process. * It is used as follows. *
 * startProcessing();
 * processEntry(entry);
 * processEntry(entry);
 * ...
 * stopProcessing();
 * merge();
 * 
*/ public class VLVIndexBuilder implements IndexBuilder { /** * The import context. */ private ImportContext importContext; /** * The vlvIndex database. */ private VLVIndex vlvIndex; /** * The add write buffer. */ TreeMap addBuffer; /** * The delete write buffer. */ TreeMap delBuffer; /** * The write buffer size. */ private int bufferSize; /** * Current output file number. */ private int fileNumber = 0; /** * A unique prefix for temporary files to prevent conflicts. */ private String fileNamePrefix; /** * Indicates whether we are replacing existing data or not. */ private boolean replaceExisting = false; private ByteArrayOutputStream addBytesStream = new ByteArrayOutputStream(); private ByteArrayOutputStream delBytesStream = new ByteArrayOutputStream(); private DataOutputStream addBytesDataStream; private DataOutputStream delBytesDataStream; /** * A file name filter to identify temporary files we have written. */ private FilenameFilter filter = new FilenameFilter() { public boolean accept(File d, String name) { return name.startsWith(fileNamePrefix); } }; /** * Construct an vlvIndex builder. * * @param importContext The import context. * @param vlvIndex The vlvIndex database we are writing. * @param bufferSize The amount of memory available for buffering. */ public VLVIndexBuilder(ImportContext importContext, VLVIndex vlvIndex, long bufferSize) { this.importContext = importContext; this.vlvIndex = vlvIndex; this.bufferSize = (int)bufferSize/100; long tid = Thread.currentThread().getId(); fileNamePrefix = vlvIndex.getName() + "_" + tid + "_"; replaceExisting = importContext.getLDIFImportConfig().appendToExistingData() && importContext.getLDIFImportConfig().replaceExistingEntries(); addBytesDataStream = new DataOutputStream(addBytesStream); delBytesDataStream = new DataOutputStream(delBytesStream); } /** * {@inheritDoc} */ public void startProcessing() { // Clean up any work files left over from a previous run. File tempDir = getFileForPath( importContext.getConfig().getImportTempDirectory()); File[] files = tempDir.listFiles(filter); if (files != null) { for (File f : files) { f.delete(); } } addBuffer = new TreeMap(); delBuffer = new TreeMap(); } /** * {@inheritDoc} */ public void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID) throws DatabaseException, IOException, DirectoryException { Transaction txn = null; SortValues newValues = new SortValues(entryID, newEntry, vlvIndex.sortOrder); // Update the vlvIndex for this entry. if (oldEntry != null) { if(vlvIndex.shouldInclude(oldEntry)) { // This is an entry being replaced. SortValues oldValues = new SortValues(entryID, oldEntry, vlvIndex.sortOrder); removeValues(oldValues, entryID); } } if(vlvIndex.shouldInclude(newEntry)) { insertValues(newValues, entryID); } } /** * {@inheritDoc} */ public void stopProcessing() throws IOException { flushBuffer(); } /** * Record the insertion of an entry ID. * @param sortValues The sort values. * @param entryID The entry ID. * @throws IOException If an I/O error occurs while writing an intermediate * file. */ private void insertValues(SortValues sortValues, EntryID entryID) throws IOException { if (addBuffer.size() + delBuffer.size() >= bufferSize) { flushBuffer(); } addBuffer.put(sortValues, entryID); } /** * Record the deletion of an entry ID. * @param sortValues The sort values to remove. * @param entryID The entry ID. * @throws IOException If an I/O error occurs while writing an intermediate * file. */ private void removeValues(SortValues sortValues, EntryID entryID) throws IOException { if (addBuffer.size() + delBuffer.size() >= bufferSize) { flushBuffer(); } delBuffer.remove(sortValues); } /** * Called when the buffer is full. It first sorts the buffer using the same * key comparator used by the vlvIndex database. Then it merges all the * IDs for the same key together and writes each key and its list of IDs * to an intermediate binary file. * A list of deleted IDs is only present if we are replacing existing entries. * * @throws IOException If an I/O error occurs while writing an intermediate * file. */ private void flushBuffer() throws IOException { if (addBuffer.size() + delBuffer.size() == 0) { return; } // Start a new file. fileNumber++; String fileName = fileNamePrefix + String.valueOf(fileNumber) + "_add"; File file = new File(getFileForPath( importContext.getConfig().getImportTempDirectory()), fileName); BufferedOutputStream bufferedStream = new BufferedOutputStream(new FileOutputStream(file)); DataOutputStream dataStream = new DataOutputStream(bufferedStream); try { for (SortValues values : addBuffer.keySet()) { dataStream.writeLong(values.getEntryID()); for(AttributeValue value : values.getValues()) { if(value != null) { byte[] valueBytes = value.getValueBytes(); dataStream.writeInt(valueBytes.length); dataStream.write(valueBytes); } else { dataStream.writeInt(0); } } } } finally { dataStream.close(); } if (replaceExisting) { fileName = fileNamePrefix + String.valueOf(fileNumber) + "_del"; file = new File(getFileForPath( importContext.getConfig().getImportTempDirectory()), fileName); bufferedStream = new BufferedOutputStream(new FileOutputStream(file)); dataStream = new DataOutputStream(bufferedStream); try { for (SortValues values : delBuffer.keySet()) { dataStream.writeLong(values.getEntryID()); for(AttributeValue value : values.getValues()) { byte[] valueBytes = value.getValueBytes(); dataStream.writeInt(valueBytes.length); dataStream.write(valueBytes); } } } finally { dataStream.close(); } } addBuffer = new TreeMap(); delBuffer = new TreeMap(); } /** * Get a string that identifies this vlvIndex builder. * * @return A string that identifies this vlvIndex builder. */ public String toString() { return vlvIndex.toString() + " builder"; } }