/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.backends.jeb; import org.opends.server.api.DirectoryThread; import org.opends.server.protocols.asn1.ASN1OctetString; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.LDIFImportConfig; import com.sleepycat.je.*; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.FilenameFilter; import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; import java.util.TreeMap; import java.util.WeakHashMap; import static org.opends.server.loggers.debug.DebugLogger.debugCaught; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import org.opends.server.types.DebugLogLevel; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.loggers.Error.logError; import static org.opends.server.messages.JebMessages.*; /** * A thread to merge a set of intermediate files from an index builder * into an index database. */ public class IndexMergeThread extends DirectoryThread { /** * The buffer size to use when reading data from disk. */ private static final int INPUT_STREAM_BUFFER_SIZE = 65536; /** * The configuration of the JE backend containing the index. */ Config config; /** * The LDIF import configuration, which indicates whether we are * appending to existing data. */ LDIFImportConfig ldifImportConfig; /** * The indexer to generate and compare index keys. */ Indexer indexer; /** * The index database being written. */ Index index; /** * The index entry limit. */ int entryLimit; /** * The name of the index for use in file names and log messages. */ String indexName; /** * Indicates whether we are replacing existing data or not. */ private boolean replaceExisting = false; /** * A weak reference hash map used to cache byte arrays for holding DB keys. */ private WeakHashMap> arrayMap = new WeakHashMap>(); /** * A file name filter to identify temporary files we have written. */ private FilenameFilter filter = new FilenameFilter() { public boolean accept(File d, String name) { return name.startsWith(indexName); } }; /** * Create a new index merge thread. * @param name The name of the index for use in file names and log messages. * @param config The configuration of the JE backend containing the index. * @param ldifImportConfig The LDIF import configuration, which indicates * whether we are appending to existing data. * @param index The index database to be written. * @param entryLimit The configured index entry limit. */ IndexMergeThread(String name, Config config, LDIFImportConfig ldifImportConfig, Index index, int entryLimit) { super("Index Merge Thread " + name); this.indexName = name; this.config = config; this.ldifImportConfig = ldifImportConfig; this.indexer = index.indexer; this.index = index; this.entryLimit = entryLimit; replaceExisting = ldifImportConfig.appendToExistingData() && ldifImportConfig.replaceExistingEntries(); } /** * Run this thread. */ public void run() { try { merge(); } catch (Exception e) { if (debugEnabled()) { debugCaught(DebugLogLevel.ERROR, e); } } } /** * The merge phase builds the index from intermediate files written * during entry processing. Each line of an intermediate file has data for * one index key and the keys are in order. For each index key, the data from * each intermediate file containing a line for that key must be merged and * written to the index. * @throws Exception If an error occurs. */ public void merge() throws Exception { // An ordered map of the current input keys from each file. OctetStringKeyComparator comparator = new OctetStringKeyComparator(indexer.getComparator()); TreeMap inputs = new TreeMap(comparator); // Open all the files. File tempDir = new File(config.getImportTempDirectory()); File[] files = tempDir.listFiles(filter); if (files == null || files.length == 0) { int msgID = MSGID_JEB_INDEX_MERGE_NO_DATA; String message = getMessage(msgID, indexName); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, message, msgID); return; } int msgID = MSGID_JEB_INDEX_MERGE_START; String message = getMessage(msgID, files.length, indexName); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, message, msgID); MergeReader[] readers = new MergeReader[files.length]; Transaction txn = null; DatabaseEntry dbKey = new DatabaseEntry(); DatabaseEntry dbData = new DatabaseEntry(); byte[] mergedBytes = new byte[0]; Longs merged = new Longs(); try { Database db = index.getDatabase(); for (int i = 0; i < files.length; i++) { // Open a reader for this file. BufferedInputStream bufferedStream = new BufferedInputStream(new FileInputStream(files[i]), INPUT_STREAM_BUFFER_SIZE); DataInputStream dis = new DataInputStream(bufferedStream); readers[i] = new MergeReader(dis); // Read a value from each file. readNext(inputs, readers, i); } // Process the lowest input value until done. try { while (true) { ASN1OctetString lowest = inputs.firstKey(); MergeValue mv = inputs.remove(lowest); byte[] keyBytes = mv.getKey(); dbKey.setData(keyBytes); List addValues = mv.getAddValues(); List delValues = mv.getDelValues(); writeMergedValue: { merged.clear(); if (ldifImportConfig.appendToExistingData()) { if (db.get(txn, dbKey, dbData, LockMode.RMW) == OperationStatus.SUCCESS) { if (dbData.getSize() == 0) { // Entry limit already exceeded. break writeMergedValue; } merged.decode(dbData.getData()); } } for (Longs l : addValues) { merged.addAll(l); } if (replaceExisting) { for (Longs l : delValues) { merged.deleteAll(l); } } if (merged.size() > entryLimit) { index.writeKey(txn, dbKey, new EntryIDSet()); } else { mergedBytes = merged.encode(mergedBytes); dbData.setData(mergedBytes); dbData.setSize(merged.encodedSize()); db.put(txn, dbKey, dbData); } LinkedList arrayList = arrayMap.get(keyBytes.length); if (arrayList == null) { arrayList = new LinkedList(); arrayMap.put(keyBytes.length, arrayList); } arrayList.add(keyBytes); } for (int r : mv.getReaders()) { readNext(inputs, readers, r); } } } catch (NoSuchElementException e) { } } catch (Exception e) { if (debugEnabled()) { debugCaught(DebugLogLevel.ERROR, e); } throw e; } finally { // Close the readers. if (readers != null) { for (MergeReader r : readers) { if (r != null) { r.dataInputStream.close(); } } } // Delete all the files. if (files != null) { for (File f : files) { f.delete(); } } } msgID = MSGID_JEB_INDEX_MERGE_COMPLETE; message = getMessage(msgID, indexName); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, message, msgID); } /** * Reads the next line from one of the merge input files. * @param inputs The ordered map of current input keys. * @param readers The array of input readers. * @param reader The index of the input reader we wish to read from. * @throws IOException If an I/O error occurs while reading the input file. */ private void readNext(TreeMap inputs, MergeReader[] readers, int reader) throws IOException { MergeReader mergeReader = readers[reader]; DataInputStream dataInputStream = mergeReader.dataInputStream; int len; try { len = dataInputStream.readInt(); } catch (EOFException e) { // End of file. return; } byte[] keyBytes; LinkedList arrayList = arrayMap.get(len); if (arrayList == null) { keyBytes = new byte[len]; arrayList = new LinkedList(); arrayMap.put(len, arrayList); } else if (arrayList.isEmpty()) { keyBytes = new byte[len]; } else { keyBytes = arrayList.removeFirst(); } dataInputStream.readFully(keyBytes); Longs addData = mergeReader.addData; addData.decode(dataInputStream); Longs delData = mergeReader.delData; if (replaceExisting) { delData.decode(dataInputStream); } // If this key is not yet in the ordered map then insert it, // otherwise merge the data into the existing data for the key. ASN1OctetString mk = new ASN1OctetString(keyBytes); MergeValue mv = inputs.get(mk); if (mv == null) { mv = new MergeValue(readers.length, entryLimit); mv.setKey(keyBytes); inputs.put(mk, mv); } else { arrayList.add(keyBytes); } mv.mergeData(reader, addData, delData); } }