mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.20.2015 5d07ec161328a94de355aa4bf93918a2da5a8602
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -1061,7 +1061,7 @@
    }
  }
  private void importPhaseTwo() throws InterruptedException, ExecutionException
  private void importPhaseTwo() throws Exception
  {
    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
@@ -1079,7 +1079,7 @@
   * Performs on-disk merge by reading several scratch files at once
   * and write their ordered content into the target indexes.
   */
  private void processIndexFiles() throws InterruptedException, ExecutionException
  private void processIndexFiles() throws Exception
  {
    if (bufferCount.get() == 0)
    {
@@ -1147,20 +1147,30 @@
    Semaphore permits = new Semaphore(buffers);
    // Start DN processing first.
    List<Future<Void>> futures = new LinkedList<>();
    submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
    submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
    getAll(futures);
    Storage storage = rootContainer.getStorage();
    storage.close();
    try (final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport())
    {
      List<Future<Void>> futures = new LinkedList<>();
      submitIndexDBWriteTasks(DNIndexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
      submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
      getAll(futures);
    }
    finally
    {
      storage.open();
    }
    shutdownAll(dbService);
  }
  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
      Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs,
      org.opends.server.backends.pluggable.spi.Importer importer,
      ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
  {
    for (IndexManager indexMgr : indexMgrs)
    {
      futures.add(dbService.submit(
          new IndexDBWriteTask(rootContainer.getStorage(), indexMgr, permits, buffers, readAheadSize)));
      futures.add(dbService.submit(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize)));
    }
  }
@@ -1706,7 +1716,7 @@
   */
  private final class IndexDBWriteTask implements Callable<Void>
  {
    private final Storage storage;
    private final org.opends.server.backends.pluggable.spi.Importer importer;
    private final IndexManager indexMgr;
    private final int cacheSize;
    /** indexID => DNState map */
@@ -1728,10 +1738,10 @@
    /**
     * Creates a new index DB writer.
     *
     * @param importer
     *          The importer
     * @param indexMgr
     *          The index manager.
     * @param storage
     *          Where to store data
     * @param permits
     *          The semaphore used for restricting the number of buffer allocations.
     * @param maxPermits
@@ -1739,9 +1749,10 @@
     * @param cacheSize
     *          The buffer cache size.
     */
    public IndexDBWriteTask(Storage storage, IndexManager indexMgr, Semaphore permits, int maxPermits, int cacheSize)
    public IndexDBWriteTask(org.opends.server.backends.pluggable.spi.Importer importer, IndexManager indexMgr,
        Semaphore permits, int maxPermits, int cacheSize)
    {
      this.storage = storage;
      this.importer = importer;
      this.indexMgr = indexMgr;
      this.permits = permits;
      this.maxPermits = maxPermits;
@@ -1822,7 +1833,7 @@
    }
    /** Finishes this task. */
    private void endWriteTask(WriteableTransaction txn)
    private void endWriteTask(org.opends.server.backends.pluggable.spi.Importer importer)
    {
      isRunning = false;
@@ -1839,8 +1850,9 @@
        {
          for (DNState dnState : dnStateMap.values())
          {
            dnState.flush(txn);
            dnState.finalFlush(importer);
          }
          if (!isCanceled)
          {
            logger.info(NOTE_JEB_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount());
@@ -1896,18 +1908,11 @@
    @Override
    public Void call() throws Exception
    {
      storage.write(new WriteOperation()
      {
        @Override
        public void run(WriteableTransaction txn) throws Exception
        {
          call0(txn);
        }
      });
      call0(importer);
      return null;
    }
    private void call0(WriteableTransaction txn) throws Exception
    private void call0(org.opends.server.backends.pluggable.spi.Importer importer) throws Exception
    {
      if (isCanceled)
      {
@@ -1936,7 +1941,7 @@
            {
              if (previousRecord != null)
              {
                addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
                addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
              }
              // this is a new record
@@ -1960,7 +1965,7 @@
          if (previousRecord != null)
          {
            addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
            addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
          }
        }
      }
@@ -1971,7 +1976,7 @@
      }
      finally
      {
        endWriteTask(txn);
        endWriteTask(importer);
      }
    }
@@ -1986,30 +1991,31 @@
      return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit());
    }
    private void addToDB(WriteableTransaction txn, int indexID, ImportIDSet insertSet, ImportIDSet deleteSet)
        throws DirectoryException
    private void addToDB(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet insertSet,
        ImportIDSet deleteSet) throws DirectoryException
    {
      keyCount.incrementAndGet();
      if (indexMgr.isDN2ID())
      {
        addDN2ID(txn, indexID, insertSet);
        addDN2ID(importer, indexID, insertSet);
      }
      else
      {
        if (!deleteSet.isDefined() || deleteSet.size() > 0)
        {
          final Index index = indexIDToIndexMap.get(indexID);
          index.importRemove(txn, deleteSet);
          index.importRemove(importer, deleteSet);
        }
        if (!insertSet.isDefined() || insertSet.size() > 0)
        {
          final Index index = indexIDToIndexMap.get(indexID);
          index.importPut(txn, insertSet);
          index.importPut(importer, insertSet);
        }
      }
    }
    private void addDN2ID(WriteableTransaction txn, int indexID, ImportIDSet idSet) throws DirectoryException
    private void addDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet idSet)
        throws DirectoryException
    {
      DNState dnState = dnStateMap.get(indexID);
      if (dnState == null)
@@ -2017,9 +2023,9 @@
        dnState = new DNState(indexIDToECMap.get(indexID));
        dnStateMap.put(indexID, dnState);
      }
      if (dnState.checkParent(txn, idSet))
      if (dnState.checkParent(importer, idSet))
      {
        dnState.writeToDN2ID(txn, idSet.getKey());
        dnState.writeToDN2ID(importer, idSet.getKey());
      }
    }
@@ -2032,7 +2038,7 @@
     * This class is used to by a index DB merge thread performing DN processing
     * to keep track of the state of individual DN2ID index processing.
     */
    final class DNState
    private final class DNState
    {
      private static final int DN_STATE_CACHE_SIZE = 64 * KB;
@@ -2043,8 +2049,9 @@
      private ByteSequence parentDN;
      private final ByteStringBuilder lastDN = new ByteStringBuilder();
      private EntryID parentID, lastID, entryID;
      private long totalNbEntries;
      DNState(EntryContainer entryContainer)
      private DNState(EntryContainer entryContainer)
      {
        this.entryContainer = entryContainer;
        dn2id = entryContainer.getDN2ID().getName();
@@ -2062,7 +2069,8 @@
      }
      /** Why do we still need this if we are checking parents in the first phase? */
      private boolean checkParent(ReadableTransaction txn, ImportIDSet idSet) throws StorageRuntimeException
      boolean checkParent(org.opends.server.backends.pluggable.spi.Importer importer, ImportIDSet idSet)
          throws StorageRuntimeException
      {
        entryID = idSet.iterator().next();
        parentDN = getParent(idSet.getKey());
@@ -2072,7 +2080,7 @@
          // If null is returned then this is a suffix DN.
          if (parentDN != null)
          {
            parentID = get(txn, dn2id, parentDN);
            parentID = get(importer, dn2id, parentDN);
            if (parentID == null)
            {
              // We have a missing parent. Maybe parent checking was turned off?
@@ -2145,43 +2153,53 @@
        return importCfg != null && importCfg.appendToExistingData();
      }
      EntryID get(ReadableTransaction txn, TreeName dn2id, ByteSequence dn) throws StorageRuntimeException
      private EntryID get(org.opends.server.backends.pluggable.spi.Importer importer, TreeName dn2id, ByteSequence dn)
          throws StorageRuntimeException
      {
        ByteString value = txn.read(dn2id, dn);
        ByteString value = importer.read(dn2id, dn);
        return value != null ? new EntryID(value) : null;
      }
      public void writeToDN2ID(WriteableTransaction txn, ByteSequence key) throws DirectoryException
      void writeToDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, ByteSequence key)
          throws DirectoryException
      {
        txn.put(dn2id, key, entryID.toByteString());
        importer.put(dn2id, key, entryID.toByteString());
        indexMgr.addTotDNCount(1);
        if (parentID != null)
        {
          incrementChildrenCounter(txn);
          incrementChildrenCounter(importer);
        }
      }
      private void incrementChildrenCounter(WriteableTransaction txn)
      private void incrementChildrenCounter(org.opends.server.backends.pluggable.spi.Importer importer)
      {
        final AtomicLong counter = getId2childrenCounter();
        counter.incrementAndGet();
        if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE)
        {
          flush(txn);
          flush(importer);
        }
      }
      private void flush(WriteableTransaction txn)
      private void flush(org.opends.server.backends.pluggable.spi.Importer importer)
      {
        for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet())
        {
          entryContainer.getID2ChildrenCount()
              .addDelta(txn, childrenCounter.getKey(), childrenCounter.getValue().get());
          final EntryID entryID = childrenCounter.getKey();
          final long totalForEntryID = childrenCounter.getValue().get();
          totalNbEntries += totalForEntryID;
          entryContainer.getID2ChildrenCount().importPut(importer, entryID, totalForEntryID);
        }
        id2childrenCountTree.clear();
      }
      void finalFlush(org.opends.server.backends.pluggable.spi.Importer importer)
      {
        flush(importer);
        entryContainer.getID2ChildrenCount().importPutTotalCount(importer, totalNbEntries);
      }
    }
  }
@@ -2957,7 +2975,7 @@
      indexKeyQueueMap.clear();
    }
    private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
    private void rebuildIndexesPhaseTwo() throws Exception
    {
      final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask());
      try