| | |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.*; |
| | | import com.sleepycat.je.DatabaseException; |
| | | import com.sleepycat.je.Transaction; |
| | | import com.sleepycat.je.LockMode; |
| | | import com.sleepycat.je.DatabaseEntry; |
| | | |
| | | import com.sleepycat.je.*; |
| | | |
| | | /** |
| | | * A thread to process import entries from a queue. Multiple instances of |
| | |
| | | */ |
| | | private boolean stopRequested = false; |
| | | |
| | | //The thread number related to a thread. |
| | | private int threadNumber; |
| | | |
| | | //The substring buffer manager to use. |
| | | private BufferManager bufferMgr; |
| | |
| | | private DatabaseEntry keyData = new DatabaseEntry(); |
| | | private DatabaseEntry data = new DatabaseEntry(); |
| | | ImportIDSet importIDSet = new IntegerImportIDSet(); |
| | | private LinkedHashMap<DN, DNContext> importMap; |
| | | |
| | | /** |
| | | * Create a work thread instance using the specified parameters. |
| | |
| | | * @param threadNumber The thread number. |
| | | * @param bufferMgr The buffer manager to use. |
| | | * @param rootContainer The root container. |
| | | * @param importMap The import map. |
| | | */ |
| | | public WorkThread(BlockingQueue<WorkElement> workQueue, int threadNumber, |
| | | BufferManager bufferMgr, |
| | | RootContainer rootContainer) { |
| | | RootContainer rootContainer, |
| | | LinkedHashMap<DN, DNContext> importMap) { |
| | | super("Import Worker Thread " + threadNumber); |
| | | this.threadNumber = threadNumber; |
| | | this.workQueue = workQueue; |
| | | this.bufferMgr = bufferMgr; |
| | | this.rootContainer = rootContainer; |
| | | this.importMap = importMap; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | } |
| | | } while (!stopRequested); |
| | | closeIndexCursors(); |
| | | } catch (Exception e) { |
| | | if (debugEnabled()) { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Close all database cursors opened by this thread. |
| | | * |
| | | * @throws DatabaseException If a database error occurs. |
| | | */ |
| | | private void closeIndexCursors() throws DatabaseException { |
| | | for (DNContext ic : importMap.values()) |
| | | { |
| | | ic.getEntryContainer().closeIndexCursors(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Process a work element. |
| | | * |
| | |
| | | */ |
| | | private void process(WorkElement element) |
| | | throws DatabaseException, DirectoryException, JebException { |
| | | Transaction txn = null; |
| | | EntryID entryID; |
| | | if((entryID = processDN2ID(element, txn)) == null) |
| | | if((entryID = processDN2ID(element)) == null) |
| | | return; |
| | | if(!processID2Entry(element, entryID, txn)) |
| | | if(!processID2Entry(element, entryID)) |
| | | return; |
| | | procesID2SCEntry(element, entryID, txn); |
| | | processIndexesEntry(element, entryID, txn); |
| | | procesID2SCEntry(element, entryID); |
| | | processIndexesEntry(element, entryID); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param element The work element. |
| | | * @param existingEntry The existing entry to replace. |
| | | * @param entryID The entry ID to remove from the keys. |
| | | * @param txn A transaction. |
| | | * @throws DatabaseException If a database error occurs. |
| | | */ |
| | | private void |
| | | processIndexesEntryDelete(WorkElement element, Entry existingEntry, |
| | | EntryID entryID, Transaction txn) |
| | | EntryID entryID) |
| | | throws DatabaseException { |
| | | DNContext context = element.getContext(); |
| | | Map<AttributeType, AttributeIndex> attrIndexMap = |
| | |
| | | AttributeIndex attributeIndex = mapEntry.getValue(); |
| | | Index index; |
| | | if((index=attributeIndex.getEqualityIndex()) != null) { |
| | | delete(index, existingEntry, entryID, txn); |
| | | delete(index, existingEntry, entryID); |
| | | } |
| | | if((index=attributeIndex.getPresenceIndex()) != null) { |
| | | delete(index, existingEntry, entryID, txn); |
| | | delete(index, existingEntry, entryID); |
| | | } |
| | | if((index=attributeIndex.getSubstringIndex()) != null) { |
| | | delete(index, existingEntry, entryID, txn); |
| | | delete(index, existingEntry, entryID); |
| | | } |
| | | if((index=attributeIndex.getOrderingIndex()) != null) { |
| | | delete(index, existingEntry, entryID, txn); |
| | | delete(index, existingEntry, entryID); |
| | | } |
| | | if((index=attributeIndex.getApproximateIndex()) != null) { |
| | | delete(index, existingEntry, entryID, txn); |
| | | delete(index, existingEntry, entryID); |
| | | } |
| | | } |
| | | } |
| | |
| | | * |
| | | * @param element The work element. |
| | | * @param entryID The entry ID to process. |
| | | * @param txn A transaction. |
| | | * @throws DatabaseException If an database error occurs. |
| | | */ |
| | | private void |
| | | processIndexesEntry(WorkElement element, EntryID entryID, Transaction txn) |
| | | processIndexesEntry(WorkElement element, EntryID entryID) |
| | | throws DatabaseException { |
| | | Entry entry = element.getEntry(); |
| | | DNContext context = element.getContext(); |
| | |
| | | ldifImportConfig.replaceExistingEntries()) { |
| | | Entry existingEntry = element.getExistingEntry(); |
| | | if(existingEntry != null) { |
| | | processIndexesEntryDelete(element, existingEntry, entryID, txn); |
| | | processIndexesEntryDelete(element, existingEntry, entryID); |
| | | } |
| | | } |
| | | Map<AttributeType, AttributeIndex> attrIndexMap = |
| | |
| | | AttributeIndex attributeIndex = mapEntry.getValue(); |
| | | Index index; |
| | | if((index=attributeIndex.getEqualityIndex()) != null) { |
| | | insert(index, entry, entryID, txn); |
| | | insert(index, entry, entryID); |
| | | } |
| | | if((index=attributeIndex.getPresenceIndex()) != null) { |
| | | insert(index, entry, entryID, txn); |
| | | insert(index, entry, entryID); |
| | | } |
| | | if((index=attributeIndex.getSubstringIndex()) != null) { |
| | | bufferMgr.insert(index,entry, entryID, txn, insertKeySet); |
| | | bufferMgr.insert(index,entry, entryID, insertKeySet); |
| | | } |
| | | if((index=attributeIndex.getOrderingIndex()) != null) { |
| | | insert(index, entry, entryID, txn); |
| | | insert(index, entry, entryID); |
| | | } |
| | | if((index=attributeIndex.getApproximateIndex()) != null) { |
| | | insert(index, entry, entryID, txn); |
| | | insert(index, entry, entryID); |
| | | } |
| | | } |
| | | } |
| | |
| | | * |
| | | * @param element The work element. |
| | | * @param entryID The entry ID to process. |
| | | * @param txn A transaction. |
| | | * @throws DatabaseException If an database error occurs. |
| | | */ |
| | | private void |
| | | procesID2SCEntry(WorkElement element, EntryID entryID, |
| | | Transaction txn) throws DatabaseException { |
| | | procesID2SCEntry(WorkElement element, EntryID entryID) |
| | | throws DatabaseException { |
| | | Entry entry = element.getEntry(); |
| | | DNContext context = element.getContext(); |
| | | LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig(); |
| | |
| | | } |
| | | Index id2children = context.getEntryContainer().getID2Children(); |
| | | Index id2subtree = context.getEntryContainer().getID2Subtree(); |
| | | bufferMgr.insert(id2children, id2subtree, entry, entryID, txn, |
| | | bufferMgr.insert(id2children, id2subtree, entry, entryID, |
| | | childKeySet, subtreeKeySet); |
| | | } |
| | | |
| | |
| | | * @param index The index to insert into. |
| | | * @param entry The entry to generate the keys from. |
| | | * @param entryID The entry ID to insert. |
| | | * @param txn A transaction. |
| | | * @return <CODE>True</CODE> if insert succeeded. |
| | | * @throws DatabaseException If a database error occurs. |
| | | */ |
| | | private boolean |
| | | insert(Index index, Entry entry, EntryID entryID, |
| | | Transaction txn) throws DatabaseException { |
| | | insert(Index index, Entry entry, EntryID entryID) throws DatabaseException { |
| | | insertKeySet.clear(); |
| | | index.indexer.indexEntry(entry, insertKeySet); |
| | | importIDSet.setEntryID(entryID); |
| | | return index.insert(txn, importIDSet, insertKeySet, keyData, data); |
| | | return index.insert(importIDSet, insertKeySet, keyData, data); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param index The index to insert into. |
| | | * @param entry The entry to generate the keys from. |
| | | * @param entryID The entry ID to insert. |
| | | * @param txn A transaction. |
| | | * @throws DatabaseException If a database error occurs. |
| | | */ |
| | | private void |
| | | delete(Index index, Entry entry, EntryID entryID, |
| | | Transaction txn) throws DatabaseException { |
| | | delete(Index index, Entry entry, EntryID entryID) throws DatabaseException { |
| | | delKeySet.clear(); |
| | | index.indexer.indexEntry(entry, delKeySet); |
| | | index.delete(txn, delKeySet, entryID); |
| | | index.delete(null, delKeySet, entryID); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param element The work element containing the entry. |
| | | * @param entryID The entry ID to use as the key. |
| | | * @param txn A transaction. |
| | | * @return <CODE>True</CODE> If the insert succeeded. |
| | | * @throws DatabaseException If a database error occurs. |
| | | * @throws DirectoryException If a directory error occurs. |
| | | */ |
| | | private boolean |
| | | processID2Entry(WorkElement element, EntryID entryID, Transaction txn) |
| | | processID2Entry(WorkElement element, EntryID entryID) |
| | | throws DatabaseException, DirectoryException { |
| | | boolean ret; |
| | | Entry entry = element.getEntry(); |
| | | DNContext context = element.getContext(); |
| | | ID2Entry id2entry = context.getEntryContainer().getID2Entry(); |
| | | DN2URI dn2uri = context.getEntryContainer().getDN2URI(); |
| | | ret=id2entry.put(txn, entryID, entry); |
| | | ret=id2entry.put(null, entryID, entry); |
| | | if(ret) { |
| | | importedCount++; |
| | | LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig(); |
| | |
| | | ldifImportConfig.replaceExistingEntries()) { |
| | | Entry existingEntry = element.getExistingEntry(); |
| | | if(existingEntry != null) { |
| | | dn2uri.replaceEntry(txn, existingEntry, entry); |
| | | dn2uri.replaceEntry(null, existingEntry, entry); |
| | | } |
| | | } else { |
| | | ret= dn2uri.addEntry(txn, entry); |
| | | ret= dn2uri.addEntry(null, entry); |
| | | } |
| | | } |
| | | return ret; |
| | |
| | | * Process entry from work element checking if it's parent exists. |
| | | * |
| | | * @param element The work element containing the entry. |
| | | * @param txn A transaction. |
| | | * @return <CODE>True</CODE> If the insert succeeded. |
| | | * @throws DatabaseException If a database error occurs. |
| | | */ |
| | | private boolean |
| | | processParent(WorkElement element, Transaction txn) |
| | | throws DatabaseException { |
| | | processParent(WorkElement element) throws DatabaseException { |
| | | Entry entry = element.getEntry(); |
| | | DNContext context = element.getContext(); |
| | | LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig(); |
| | |
| | | DN parentDN = context.getEntryContainer().getParentWithinBase(entryDN); |
| | | DN2ID dn2id = context.getEntryContainer().getDN2ID(); |
| | | if (parentDN != null) { |
| | | parentID = context.getParentID(parentDN, dn2id, txn); |
| | | parentID = context.getParentID(parentDN, dn2id); |
| | | if (parentID == null) { |
| | | dn2id.remove(txn, entryDN); |
| | | dn2id.remove(null, entryDN); |
| | | Message msg = |
| | | ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString()); |
| | | context.getLDIFReader().rejectLastEntry(msg); |
| | |
| | | EntryContainer ec = context.getEntryContainer(); |
| | | for (DN dn = ec.getParentWithinBase(parentDN); dn != null; |
| | | dn = ec.getParentWithinBase(dn)) { |
| | | if((nodeID = getAncestorID(dn2id, dn, txn)) == null) { |
| | | if((nodeID = getAncestorID(dn2id, dn)) == null) { |
| | | return false; |
| | | } else { |
| | | IDs.add(nodeID); |
| | |
| | | return true; |
| | | } |
| | | |
| | | private EntryID getAncestorID(DN2ID dn2id, DN dn, Transaction txn) |
| | | private EntryID getAncestorID(DN2ID dn2id, DN dn) |
| | | throws DatabaseException { |
| | | int i=0; |
| | | EntryID nodeID = dn2id.get(txn, dn, LockMode.DEFAULT); |
| | | EntryID nodeID = dn2id.get(null, dn, LockMode.DEFAULT); |
| | | if(nodeID == null) { |
| | | while((nodeID = dn2id.get(txn, dn, LockMode.DEFAULT)) == null) { |
| | | while((nodeID = dn2id.get(null, dn, LockMode.DEFAULT)) == null) { |
| | | try { |
| | | Thread.sleep(50); |
| | | if(i == 3) { |
| | |
| | | * Process the a entry from the work element into the dn2id DB. |
| | | * |
| | | * @param element The work element containing the entry. |
| | | * @param txn A transaction. |
| | | * @return An entry ID. |
| | | * @throws DatabaseException If a database error occurs. |
| | | * @throws JebException If a JEB error occurs. |
| | | */ |
| | | private EntryID |
| | | processDN2ID(WorkElement element, Transaction txn) |
| | | throws DatabaseException, JebException { |
| | | processDN2ID(WorkElement element) throws DatabaseException, JebException { |
| | | Entry entry = element.getEntry(); |
| | | DNContext context = element.getContext(); |
| | | DN2ID dn2id = context.getEntryContainer().getDN2ID(); |
| | | LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig(); |
| | | DN entryDN = entry.getDN(); |
| | | EntryID entryID = dn2id.get(txn, entryDN, LockMode.DEFAULT); |
| | | EntryID entryID = dn2id.get(null, entryDN, LockMode.DEFAULT); |
| | | if (entryID != null) { |
| | | if (ldifImportConfig.appendToExistingData() && |
| | | ldifImportConfig.replaceExistingEntries()) { |
| | | ID2Entry id2entry = context.getEntryContainer().getID2Entry(); |
| | | Entry existingEntry = id2entry.get(txn, entryID, LockMode.DEFAULT); |
| | | Entry existingEntry = id2entry.get(null, entryID, LockMode.DEFAULT); |
| | | element.setExistingEntry(existingEntry); |
| | | } else { |
| | | Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get(); |
| | |
| | | entryID = null; |
| | | } |
| | | } else { |
| | | if(!processParent(element, txn)) |
| | | if(!processParent(element)) |
| | | return null; |
| | | if (ldifImportConfig.appendToExistingData() && |
| | | ldifImportConfig.replaceExistingEntries()) { |
| | |
| | | ArrayList IDs = (ArrayList)entry.getAttachment(); |
| | | entryID = (EntryID)IDs.get(0); |
| | | } |
| | | dn2id.insert(txn, entryDN, entryID); |
| | | dn2id.insert(null, entryDN, entryID); |
| | | } |
| | | context.removePending(entryDN); |
| | | return entryID; |