| | |
| | | */ |
| | | package org.opends.server.backends.jeb; |
| | | |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.types.Entry; |
| | | import static org.opends.server.util.StaticUtils.getFileForPath; |
| | | |
| | | import org.opends.server.types.DirectoryException; |
| | | import com.sleepycat.je.DatabaseException; |
| | | import com.sleepycat.je.Transaction; |
| | | |
| | | import java.util.Arrays; |
| | | import java.util.ArrayList; |
| | | import java.util.Collections; |
| | | import java.util.HashSet; |
| | | import java.util.Set; |
| | | import java.io.ByteArrayOutputStream; |
| | | import java.io.BufferedOutputStream; |
| | | import java.io.DataOutputStream; |
| | | import java.io.File; |
| | | import java.io.FilenameFilter; |
| | | import java.io.FileOutputStream; |
| | | import java.io.IOException; |
| | | |
| | | /** |
| | | * This class is used to create an attribute index for an import process. |
| | | * It is used as follows. |
| | | * <pre> |
| | | * startProcessing(); |
| | | * processEntry(entry); |
| | | * processEntry(entry); |
| | | * ... |
| | | * stopProcessing(); |
| | | * merge(); |
| | | * </pre> |
| | | * The interface that represents a index builder for the import process. |
| | | */ |
| | | public class IndexBuilder |
| | | public interface IndexBuilder |
| | | { |
| | | /** |
| | | * The import context. |
| | | */ |
| | | private ImportContext importContext; |
| | | |
| | | /** |
| | | * The index database. |
| | | */ |
| | | private Index index; |
| | | |
| | | /** |
| | | * The indexer to generate the index keys. |
| | | */ |
| | | private Indexer indexer; |
| | | |
| | | /** |
| | | * The write buffer. |
| | | */ |
| | | ArrayList<IndexMod> buffer; |
| | | |
| | | /** |
| | | * The write buffer size. |
| | | */ |
| | | private int bufferSize; |
| | | |
| | | /** |
| | | * Current output file number. |
| | | */ |
| | | private int fileNumber = 0; |
| | | |
| | | /** |
| | | * The index entry limit. |
| | | */ |
| | | private int entryLimit; |
| | | |
| | | /** |
| | | * A unique prefix for temporary files to prevent conflicts. |
| | | */ |
| | | private String fileNamePrefix; |
| | | |
| | | /** |
| | | * Indicates whether we are replacing existing data or not. |
| | | */ |
| | | private boolean replaceExisting = false; |
| | | |
| | | |
| | | private ByteArrayOutputStream addBytesStream = new ByteArrayOutputStream(); |
| | | private ByteArrayOutputStream delBytesStream = new ByteArrayOutputStream(); |
| | | |
| | | private DataOutputStream addBytesDataStream; |
| | | private DataOutputStream delBytesDataStream; |
| | | |
| | | /** |
| | | * A file name filter to identify temporary files we have written. |
| | | */ |
| | | private FilenameFilter filter = new FilenameFilter() |
| | | { |
| | | public boolean accept(File d, String name) |
| | | { |
| | | return name.startsWith(fileNamePrefix); |
| | | } |
| | | }; |
| | | |
| | | /** |
| | | * Construct an index builder. |
| | | * |
| | | * @param importContext The import context. |
| | | * @param index The index database we are writing. |
| | | * @param entryLimit The index entry limit. |
| | | * @param bufferSize The amount of memory available for buffering. |
| | | */ |
| | | public IndexBuilder(ImportContext importContext, |
| | | Index index, int entryLimit, long bufferSize) |
| | | { |
| | | this.importContext = importContext; |
| | | this.index = index; |
| | | this.indexer = index.indexer; |
| | | this.entryLimit = entryLimit; |
| | | this.bufferSize = (int)bufferSize/100; |
| | | long tid = Thread.currentThread().getId(); |
| | | fileNamePrefix = index.getName() + "_" + tid + "_"; |
| | | replaceExisting = |
| | | importContext.getLDIFImportConfig().appendToExistingData() && |
| | | importContext.getLDIFImportConfig().replaceExistingEntries(); |
| | | addBytesDataStream = new DataOutputStream(addBytesStream); |
| | | delBytesDataStream = new DataOutputStream(delBytesStream); |
| | | } |
| | | |
| | | /** |
| | | * This method must be called before this object can process any |
| | | * entries. It cleans up any temporary files left over from a |
| | | * previous import. |
| | | */ |
| | | public void startProcessing() |
| | | { |
| | | // Clean up any work files left over from a previous run. |
| | | File tempDir = getFileForPath( |
| | | importContext.getConfig().getBackendImportTempDirectory()); |
| | | File[] files = tempDir.listFiles(filter); |
| | | if (files != null) |
| | | { |
| | | for (File f : files) |
| | | { |
| | | f.delete(); |
| | | } |
| | | } |
| | | |
| | | buffer = new ArrayList<IndexMod>(bufferSize); |
| | | } |
| | | void startProcessing(); |
| | | |
| | | /** |
| | | * Indicates that the index thread should process the provided entry. |
| | |
| | | * a new entry. |
| | | * @param newEntry The new contents of the entry. |
| | | * @param entryID The entry ID. |
| | | * @throws DatabaseException If an error occurs in the JE database. |
| | | * @throws IOException If an I/O error occurs while writing an intermediate |
| | | * file. |
| | | * @throws com.sleepycat.je.DatabaseException If an error occurs in the JE |
| | | * database. |
| | | * @throws java.io.IOException If an I/O error occurs while writing an |
| | | * intermediate file. |
| | | * @throws DirectoryException If an error occurs while processing the entry. |
| | | */ |
| | | public void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID) |
| | | throws DatabaseException, IOException |
| | | { |
| | | Transaction txn = null; |
| | | |
| | | // Update the index for this entry. |
| | | if (oldEntry != null) |
| | | { |
| | | // This is an entry being replaced. |
| | | Set<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>(); |
| | | Set<ASN1OctetString> delKeys = new HashSet<ASN1OctetString>(); |
| | | |
| | | indexer.replaceEntry(txn, oldEntry, newEntry, addKeys, delKeys); |
| | | |
| | | for (ASN1OctetString k : delKeys) |
| | | { |
| | | removeID(k.value(), entryID); |
| | | } |
| | | |
| | | for (ASN1OctetString k : addKeys) |
| | | { |
| | | insertID(k.value(), entryID); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // This is a new entry. |
| | | Set<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>(); |
| | | indexer.indexEntry(txn, newEntry, addKeys); |
| | | for (ASN1OctetString k : addKeys) |
| | | { |
| | | insertID(k.value(), entryID); |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID) |
| | | throws DatabaseException, IOException, DirectoryException; |
| | | |
| | | /** |
| | | * Indicates that there will be no more updates. |
| | | * @throws IOException If an I/O error occurs while writing an intermediate |
| | | * file. |
| | | */ |
| | | public void stopProcessing() throws IOException |
| | | { |
| | | flushBuffer(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get a statistic of the number of keys that reached the entry limit. |
| | | * |
| | | * @return The number of keys that reached the entry limit. |
| | | */ |
| | | public int getEntryLimitExceededCount() |
| | | { |
| | | return index.getEntryLimitExceededCount(); |
| | | } |
| | | |
| | | /** |
| | | * Record the insertion of an entry ID. |
| | | * @param key The index key. |
| | | * @param entryID The entry ID. |
| | | * @throws IOException If an I/O error occurs while writing an intermediate |
| | | * file. |
| | | */ |
| | | private void insertID(byte[] key, EntryID entryID) |
| | | throws IOException |
| | | { |
| | | if (buffer.size() >= bufferSize) |
| | | { |
| | | flushBuffer(); |
| | | } |
| | | |
| | | IndexMod kav = new IndexMod(key, entryID, false); |
| | | buffer.add(kav); |
| | | } |
| | | |
| | | /** |
| | | * Record the deletion of an entry ID. |
| | | * @param key The index key. |
| | | * @param entryID The entry ID. |
| | | * @throws IOException If an I/O error occurs while writing an intermediate |
| | | * file. |
| | | */ |
| | | private void removeID(byte[] key, EntryID entryID) |
| | | throws IOException |
| | | { |
| | | if (buffer.size() >= bufferSize) |
| | | { |
| | | flushBuffer(); |
| | | } |
| | | |
| | | IndexMod kav = new IndexMod(key, entryID, true); |
| | | buffer.add(kav); |
| | | } |
| | | |
| | | /** |
| | | * Called when the buffer is full. It first sorts the buffer using the same |
| | | * key comparator used by the index database. Then it merges all the |
| | | * IDs for the same key together and writes each key and its list of IDs |
| | | * to an intermediate binary file. |
| | | * A list of deleted IDs is only present if we are replacing existing entries. |
| | | * |
| | | * @throws IOException If an I/O error occurs while writing an intermediate |
| | | * file. |
| | | */ |
| | | private void flushBuffer() throws IOException |
| | | { |
| | | if (buffer.size() == 0) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | // Keys must be sorted before we can merge duplicates. |
| | | IndexModComparator comparator; |
| | | if (replaceExisting) |
| | | { |
| | | // The entry IDs may be out of order. |
| | | // We must sort by key and ID. |
| | | comparator = new IndexModComparator(indexer.getComparator(), true); |
| | | } |
| | | else |
| | | { |
| | | // The entry IDs are all new and are therefore already ordered. |
| | | // We just need to sort by key. |
| | | comparator = new IndexModComparator(indexer.getComparator(), false); |
| | | } |
| | | Collections.sort(buffer, comparator); |
| | | |
| | | // Start a new file. |
| | | fileNumber++; |
| | | String fileName = fileNamePrefix + String.valueOf(fileNumber); |
| | | File file = new File(getFileForPath( |
| | | importContext.getConfig().getBackendImportTempDirectory()), |
| | | fileName); |
| | | BufferedOutputStream bufferedStream = |
| | | new BufferedOutputStream(new FileOutputStream(file)); |
| | | DataOutputStream dataStream = new DataOutputStream(bufferedStream); |
| | | |
| | | // Reset the byte array output streams but preserve the underlying arrays. |
| | | addBytesStream.reset(); |
| | | delBytesStream.reset(); |
| | | |
| | | try |
| | | { |
| | | byte[] currentKey = null; |
| | | for (IndexMod key : buffer) |
| | | { |
| | | byte[] keyString = key.key; |
| | | if (!Arrays.equals(keyString,currentKey)) |
| | | { |
| | | if (currentKey != null) |
| | | { |
| | | dataStream.writeInt(currentKey.length); |
| | | dataStream.write(currentKey); |
| | | dataStream.writeInt(addBytesStream.size()); |
| | | addBytesStream.writeTo(dataStream); |
| | | if (replaceExisting) |
| | | { |
| | | dataStream.writeInt(delBytesStream.size()); |
| | | delBytesStream.writeTo(dataStream); |
| | | } |
| | | } |
| | | |
| | | currentKey = keyString; |
| | | addBytesStream.reset(); |
| | | delBytesStream.reset(); |
| | | } |
| | | |
| | | if (key.isDelete) |
| | | { |
| | | delBytesDataStream.writeLong(key.value.longValue()); |
| | | } |
| | | else |
| | | { |
| | | addBytesDataStream.writeLong(key.value.longValue()); |
| | | } |
| | | |
| | | } |
| | | |
| | | if (currentKey != null) |
| | | { |
| | | dataStream.writeInt(currentKey.length); |
| | | dataStream.write(currentKey); |
| | | dataStream.writeInt(addBytesStream.size()); |
| | | addBytesStream.writeTo(dataStream); |
| | | if (replaceExisting) |
| | | { |
| | | dataStream.writeInt(delBytesStream.size()); |
| | | delBytesStream.writeTo(dataStream); |
| | | } |
| | | } |
| | | |
| | | buffer = new ArrayList<IndexMod>(bufferSize); |
| | | } |
| | | finally |
| | | { |
| | | dataStream.close(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get a string that identifies this index builder. |
| | | * |
| | | * @return A string that identifies this index builder. |
| | | */ |
| | | public String toString() |
| | | { |
| | | return indexer.toString(); |
| | | } |
| | | void stopProcessing() throws IOException; |
| | | } |
| | | |