mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

dugan
16.18.2008 0c04328d7c86a6b2d4badb96c79a22aba2fa7eca
opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
@@ -37,10 +37,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.*;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.*;
/**
 * A thread to process import entries from a queue.  Multiple instances of
@@ -72,8 +70,6 @@
   */
  private boolean stopRequested = false;
  //The thread number related to a thread.
  private int threadNumber;
  //The substring buffer manager to use.
  private BufferManager bufferMgr;
@@ -86,6 +82,7 @@
  private DatabaseEntry keyData = new DatabaseEntry();
  private DatabaseEntry data = new DatabaseEntry();
  ImportIDSet importIDSet = new IntegerImportIDSet();
  private LinkedHashMap<DN, DNContext> importMap;
  /**
   * Create a work thread instance using the specified parameters.
@@ -94,15 +91,17 @@
   * @param threadNumber The thread number.
   * @param bufferMgr  The buffer manager to use.
   * @param rootContainer The root container.
   * @param importMap The import map.
   */
  public WorkThread(BlockingQueue<WorkElement> workQueue, int threadNumber,
                                BufferManager bufferMgr,
                                RootContainer rootContainer) {
                                RootContainer rootContainer,
                                LinkedHashMap<DN, DNContext> importMap) {
    super("Import Worker Thread " + threadNumber);
    this.threadNumber = threadNumber;
    this.workQueue = workQueue;
    this.bufferMgr = bufferMgr;
    this.rootContainer = rootContainer;
    this.importMap = importMap;
  }
  /**
@@ -142,6 +141,7 @@
          }
        }
      } while (!stopRequested);
      closeIndexCursors();
    } catch (Exception e) {
      if (debugEnabled()) {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
@@ -150,6 +150,19 @@
    }
  }
  /**
   * Close all database cursors opened by this thread.
   *
   * @throws DatabaseException If a database error occurs.
   */
  private void closeIndexCursors() throws DatabaseException {
    for (DNContext ic : importMap.values())
    {
      ic.getEntryContainer().closeIndexCursors();
    }
  }
  /**
   * Process a work element.
   *
@@ -161,14 +174,13 @@
   */
  private void process(WorkElement element)
  throws DatabaseException, DirectoryException, JebException {
    Transaction txn = null;
    EntryID entryID;
    if((entryID = processDN2ID(element, txn)) == null)
    if((entryID = processDN2ID(element)) == null)
      return;
    if(!processID2Entry(element, entryID, txn))
    if(!processID2Entry(element, entryID))
      return;
    procesID2SCEntry(element, entryID, txn);
    processIndexesEntry(element, entryID, txn);
    procesID2SCEntry(element, entryID);
    processIndexesEntry(element, entryID);
  }
  /**
@@ -178,12 +190,11 @@
   * @param element The work element.
   * @param existingEntry The existing entry to replace.
   * @param entryID The entry ID to remove from the keys.
   * @param txn A transaction.
   * @throws DatabaseException If a database error occurs.
   */
  private void
  processIndexesEntryDelete(WorkElement element, Entry existingEntry,
                            EntryID entryID, Transaction txn)
                            EntryID entryID)
          throws DatabaseException {
    DNContext context = element.getContext();
    Map<AttributeType, AttributeIndex> attrIndexMap =
@@ -195,19 +206,19 @@
        AttributeIndex attributeIndex = mapEntry.getValue();
        Index index;
        if((index=attributeIndex.getEqualityIndex()) != null) {
          delete(index, existingEntry, entryID, txn);
          delete(index, existingEntry, entryID);
        }
        if((index=attributeIndex.getPresenceIndex()) != null) {
          delete(index, existingEntry, entryID, txn);
          delete(index, existingEntry, entryID);
        }
        if((index=attributeIndex.getSubstringIndex()) != null) {
          delete(index, existingEntry, entryID, txn);
          delete(index, existingEntry, entryID);
        }
        if((index=attributeIndex.getOrderingIndex()) != null) {
          delete(index, existingEntry, entryID, txn);
          delete(index, existingEntry, entryID);
        }
        if((index=attributeIndex.getApproximateIndex()) != null) {
          delete(index, existingEntry, entryID, txn);
          delete(index, existingEntry, entryID);
        }
      }
    }
@@ -218,11 +229,10 @@
   *
   * @param element The work element.
   * @param entryID The entry ID to process.
   * @param txn A transaction.
   * @throws DatabaseException If an database error occurs.
   */
  private void
  processIndexesEntry(WorkElement element, EntryID entryID, Transaction txn)
  processIndexesEntry(WorkElement element, EntryID entryID)
          throws DatabaseException {
    Entry entry = element.getEntry();
    DNContext context = element.getContext();
@@ -231,7 +241,7 @@
            ldifImportConfig.replaceExistingEntries()) {
      Entry existingEntry = element.getExistingEntry();
      if(existingEntry != null) {
          processIndexesEntryDelete(element, existingEntry, entryID, txn);
          processIndexesEntryDelete(element, existingEntry, entryID);
      }
    }
    Map<AttributeType, AttributeIndex> attrIndexMap =
@@ -243,19 +253,19 @@
        AttributeIndex attributeIndex = mapEntry.getValue();
        Index index;
        if((index=attributeIndex.getEqualityIndex()) != null) {
          insert(index, entry, entryID, txn);
          insert(index, entry, entryID);
        }
        if((index=attributeIndex.getPresenceIndex()) != null) {
          insert(index, entry, entryID, txn);
          insert(index, entry, entryID);
        }
        if((index=attributeIndex.getSubstringIndex()) != null) {
          bufferMgr.insert(index,entry, entryID, txn, insertKeySet);
          bufferMgr.insert(index,entry, entryID, insertKeySet);
        }
        if((index=attributeIndex.getOrderingIndex()) != null) {
          insert(index, entry, entryID, txn);
          insert(index, entry, entryID);
        }
        if((index=attributeIndex.getApproximateIndex()) != null) {
          insert(index, entry, entryID, txn);
          insert(index, entry, entryID);
        }
      }
    }
@@ -266,12 +276,11 @@
   *
   * @param element The work element.
   * @param entryID The entry ID to process.
   * @param txn A transaction.
   * @throws DatabaseException If an database error occurs.
   */
  private  void
  procesID2SCEntry(WorkElement element, EntryID entryID,
                   Transaction txn) throws DatabaseException {
  procesID2SCEntry(WorkElement element, EntryID entryID)
          throws DatabaseException {
    Entry entry = element.getEntry();
    DNContext context = element.getContext();
    LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
@@ -281,7 +290,7 @@
    }
    Index id2children = context.getEntryContainer().getID2Children();
    Index id2subtree = context.getEntryContainer().getID2Subtree();
    bufferMgr.insert(id2children, id2subtree, entry, entryID, txn,
    bufferMgr.insert(id2children, id2subtree, entry, entryID,
                    childKeySet, subtreeKeySet);
  }
@@ -292,17 +301,15 @@
   * @param index  The index to insert into.
   * @param entry The entry to generate the keys from.
   * @param entryID The entry ID to insert.
   * @param txn A transaction.
   * @return <CODE>True</CODE> if insert succeeded.
   * @throws DatabaseException If a database error occurs.
   */
  private boolean
  insert(Index index, Entry entry, EntryID entryID,
         Transaction txn) throws DatabaseException {
  insert(Index index, Entry entry, EntryID entryID) throws DatabaseException {
    insertKeySet.clear();
    index.indexer.indexEntry(entry, insertKeySet);
    importIDSet.setEntryID(entryID);
    return index.insert(txn, importIDSet, insertKeySet, keyData, data);
    return index.insert(importIDSet, insertKeySet, keyData, data);
  }
  /**
@@ -312,15 +319,13 @@
   * @param index  The index to insert into.
   * @param entry The entry to generate the keys from.
   * @param entryID The entry ID to insert.
   * @param txn A transaction.
   * @throws DatabaseException If a database error occurs.
   */
  private void
  delete(Index index, Entry entry, EntryID entryID,
         Transaction txn) throws DatabaseException {
  delete(Index index, Entry entry, EntryID entryID) throws DatabaseException {
    delKeySet.clear();
    index.indexer.indexEntry(entry, delKeySet);
    index.delete(txn, delKeySet,  entryID);
    index.delete(null, delKeySet,  entryID);
  }
  /**
@@ -328,20 +333,19 @@
   *
   * @param element The work element containing the entry.
   * @param entryID The entry ID to use as the key.
   * @param txn A transaction.
   * @return <CODE>True</CODE> If the insert succeeded.
   * @throws DatabaseException If a database error occurs.
   * @throws DirectoryException  If a directory error occurs.
   */
  private boolean
  processID2Entry(WorkElement element, EntryID entryID, Transaction txn)
  processID2Entry(WorkElement element, EntryID entryID)
          throws DatabaseException, DirectoryException {
    boolean ret;
    Entry entry = element.getEntry();
    DNContext context = element.getContext();
    ID2Entry id2entry = context.getEntryContainer().getID2Entry();
    DN2URI dn2uri = context.getEntryContainer().getDN2URI();
    ret=id2entry.put(txn, entryID, entry);
    ret=id2entry.put(null, entryID, entry);
    if(ret) {
      importedCount++;
      LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
@@ -349,10 +353,10 @@
              ldifImportConfig.replaceExistingEntries()) {
        Entry existingEntry = element.getExistingEntry();
        if(existingEntry != null) {
          dn2uri.replaceEntry(txn, existingEntry, entry);
          dn2uri.replaceEntry(null, existingEntry, entry);
        }
      } else {
        ret= dn2uri.addEntry(txn, entry);
        ret= dn2uri.addEntry(null, entry);
      }
    }
    return ret;
@@ -362,13 +366,11 @@
   * Process entry from work element checking if it's parent exists.
   *
   * @param element The work element containing the entry.
   * @param txn A transaction.
   * @return <CODE>True</CODE> If the insert succeeded.
   * @throws DatabaseException If a database error occurs.
   */
  private boolean
  processParent(WorkElement element, Transaction txn)
          throws DatabaseException {
  processParent(WorkElement element) throws DatabaseException {
    Entry entry = element.getEntry();
    DNContext context = element.getContext();
    LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
@@ -381,9 +383,9 @@
    DN parentDN = context.getEntryContainer().getParentWithinBase(entryDN);
    DN2ID dn2id = context.getEntryContainer().getDN2ID();
    if (parentDN != null) {
      parentID = context.getParentID(parentDN, dn2id, txn);
      parentID = context.getParentID(parentDN, dn2id);
      if (parentID == null) {
        dn2id.remove(txn, entryDN);
        dn2id.remove(null, entryDN);
        Message msg =
                ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
        context.getLDIFReader().rejectLastEntry(msg);
@@ -406,7 +408,7 @@
        EntryContainer ec = context.getEntryContainer();
        for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
             dn = ec.getParentWithinBase(dn)) {
          if((nodeID =  getAncestorID(dn2id, dn, txn)) == null) {
          if((nodeID =  getAncestorID(dn2id, dn)) == null) {
            return false;
          } else {
            IDs.add(nodeID);
@@ -420,12 +422,12 @@
    return true;
  }
  private EntryID getAncestorID(DN2ID dn2id, DN dn, Transaction txn)
  private EntryID getAncestorID(DN2ID dn2id, DN dn)
          throws DatabaseException {
    int i=0;
    EntryID nodeID = dn2id.get(txn, dn, LockMode.DEFAULT);
    EntryID nodeID = dn2id.get(null, dn, LockMode.DEFAULT);
    if(nodeID == null) {
      while((nodeID = dn2id.get(txn, dn, LockMode.DEFAULT)) == null) {
      while((nodeID = dn2id.get(null, dn, LockMode.DEFAULT)) == null) {
        try {
          Thread.sleep(50);
          if(i == 3) {
@@ -444,25 +446,23 @@
   * Process the a entry from the work element into the dn2id DB.
   *
   * @param element The work element containing the entry.
   * @param txn A transaction.
   * @return An entry ID.
   * @throws DatabaseException If a database error occurs.
   * @throws JebException If a JEB error occurs.
   */
  private EntryID
  processDN2ID(WorkElement element, Transaction txn)
          throws DatabaseException, JebException {
  processDN2ID(WorkElement element) throws DatabaseException, JebException {
    Entry entry = element.getEntry();
    DNContext context = element.getContext();
    DN2ID dn2id = context.getEntryContainer().getDN2ID();
    LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
    DN entryDN = entry.getDN();
    EntryID entryID = dn2id.get(txn, entryDN, LockMode.DEFAULT);
    EntryID entryID = dn2id.get(null, entryDN, LockMode.DEFAULT);
    if (entryID != null) {
      if (ldifImportConfig.appendToExistingData() &&
              ldifImportConfig.replaceExistingEntries()) {
        ID2Entry id2entry = context.getEntryContainer().getID2Entry();
        Entry existingEntry = id2entry.get(txn, entryID, LockMode.DEFAULT);
        Entry existingEntry = id2entry.get(null, entryID, LockMode.DEFAULT);
        element.setExistingEntry(existingEntry);
      } else {
        Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
@@ -470,7 +470,7 @@
        entryID = null;
      }
    } else {
      if(!processParent(element, txn))
      if(!processParent(element))
        return null;
      if (ldifImportConfig.appendToExistingData() &&
              ldifImportConfig.replaceExistingEntries()) {
@@ -479,7 +479,7 @@
        ArrayList IDs = (ArrayList)entry.getAttachment();
        entryID = (EntryID)IDs.get(0);
      }
      dn2id.insert(txn, entryDN, entryID);
      dn2id.insert(null, entryDN, entryID);
    }
    context.removePending(entryDN);
    return entryID;