mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
07.12.2015 90a6ab6c63699343acf3adcd4346bce2f5665bdd
opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/Importer.java
@@ -217,20 +217,18 @@
  private ExecutorService scratchFileWriterService;
  /** Queue of free index buffers -- used to re-cycle index buffers. */
  private final BlockingQueue<IndexOutputBuffer> freeBufferQueue =
      new LinkedBlockingQueue<IndexOutputBuffer>();
  private final BlockingQueue<IndexOutputBuffer> freeBufferQueue = new LinkedBlockingQueue<>();
  /**
   * Map of index keys to index buffers. Used to allocate sorted index buffers
   * to a index writer thread.
   */
  private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap =
      new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
  private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap = new ConcurrentHashMap<>();
  /** Map of DB containers to index managers. Used to start phase 2. */
  private final List<IndexManager> indexMgrList = new LinkedList<IndexManager>();
  private final List<IndexManager> indexMgrList = new LinkedList<>();
  /** Map of DB containers to DN-based index managers. Used to start phase 2. */
  private final List<IndexManager> DNIndexMgrList = new LinkedList<IndexManager>();
  private final List<IndexManager> DNIndexMgrList = new LinkedList<>();
  /**
   * Futures used to indicate when the index file writers are done flushing
@@ -244,12 +242,11 @@
  private final List<ScratchFileWriterTask> scratchFileWriterList;
  /** Map of DNs to Suffix objects. */
  private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
  private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<>();
  /** Map of container ids to database containers. */
  private final ConcurrentHashMap<Integer, Index> idContainerMap = new ConcurrentHashMap<Integer, Index>();
  private final ConcurrentHashMap<Integer, Index> idContainerMap = new ConcurrentHashMap<>();
  /** Map of container ids to entry containers. */
  private final ConcurrentHashMap<Integer, EntryContainer> idECMap =
      new ConcurrentHashMap<Integer, EntryContainer>();
  private final ConcurrentHashMap<Integer, EntryContainer> idECMap = new ConcurrentHashMap<>();
  /** Used to synchronize when a scratch file index writer is first setup. */
  private final Object synObj = new Object();
@@ -309,9 +306,8 @@
    this.rebuildManager = new RebuildIndexManager(rebuildConfig, cfg);
    this.indexCount = rebuildManager.getIndexCount();
    this.clearedBackend = false;
    this.scratchFileWriterList =
        new ArrayList<ScratchFileWriterTask>(indexCount);
    this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>();
    this.scratchFileWriterList = new ArrayList<>(indexCount);
    this.scratchFileWriterFutures = new CopyOnWriteArrayList<>();
    this.tempDir = getTempDir(cfg, rebuildConfig.getTmpDirectory());
    recursiveDelete(tempDir);
@@ -363,9 +359,8 @@
    this.indexCount = getTotalIndexCount(localDBBackendCfg);
    this.clearedBackend = mustClearBackend(importConfiguration, localDBBackendCfg);
    this.scratchFileWriterList =
        new ArrayList<ScratchFileWriterTask>(indexCount);
    this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>();
    this.scratchFileWriterList = new ArrayList<>(indexCount);
    this.scratchFileWriterFutures = new CopyOnWriteArrayList<>();
    this.tempDir = getTempDir(localDBBackendCfg, importConfiguration.getTmpDirectory());
    recursiveDelete(tempDir);
@@ -712,8 +707,8 @@
  {
    DN baseDN = entryContainer.getBaseDN();
    EntryContainer sourceEntryContainer = null;
    List<DN> includeBranches = new ArrayList<DN>();
    List<DN> excludeBranches = new ArrayList<DN>();
    List<DN> includeBranches = new ArrayList<>();
    List<DN> excludeBranches = new ArrayList<>();
    if (!importConfiguration.appendToExistingData()
        && !importConfiguration.clearBackend())
@@ -1041,7 +1036,7 @@
    bufferSortService = Executors.newFixedThreadPool(threadCount);
    final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
    final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
    final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
    tasks.add(new MigrateExistingTask());
    getAll(execService.invokeAll(tasks));
    tasks.clear();
@@ -1134,7 +1129,7 @@
    int buffers;
    while (true)
    {
      final List<IndexManager> allIndexMgrs = new ArrayList<IndexManager>(DNIndexMgrList);
      final List<IndexManager> allIndexMgrs = new ArrayList<>(DNIndexMgrList);
      allIndexMgrs.addAll(indexMgrList);
      Collections.sort(allIndexMgrs, Collections.reverseOrder());
@@ -1180,7 +1175,7 @@
    logger.info(NOTE_IMPORT_LDIF_PHASE_TWO_MEM_REPORT, availableMemory, readAheadSize, buffers);
    // Start indexing tasks.
    List<Future<Void>> futures = new LinkedList<Future<Void>>();
    List<Future<Void>> futures = new LinkedList<>();
    ExecutorService dbService = Executors.newFixedThreadPool(dbThreads);
    Semaphore permits = new Semaphore(buffers);
@@ -1354,7 +1349,7 @@
    private List<byte[]> includeBranchesAsBytes(Suffix suffix)
    {
      List<byte[]> includeBranches = new ArrayList<byte[]>(suffix.getIncludeBranches().size());
      List<byte[]> includeBranches = new ArrayList<>(suffix.getIncludeBranches().size());
      for (DN includeBranch : suffix.getIncludeBranches())
      {
        if (includeBranch.isDescendantOf(suffix.getBaseDN()))
@@ -1383,8 +1378,8 @@
   */
  private class AppendReplaceTask extends ImportTask
  {
    private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
    private final Set<ByteString> deleteKeySet = new HashSet<ByteString>();
    private final Set<ByteString> insertKeySet = new HashSet<>();
    private final Set<ByteString> deleteKeySet = new HashSet<>();
    private final EntryInformation entryInfo = new EntryInformation();
    private Entry oldEntry;
    private EntryID entryID;
@@ -1499,8 +1494,8 @@
   */
  private class ImportTask implements Callable<Void>
  {
    private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>();
    private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
    private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<>();
    private final Set<ByteString> insertKeySet = new HashSet<>();
    private final EntryInformation entryInfo = new EntryInformation();
    private final IndexKey dnIndexKey = new IndexKey(dnType, ImportIndexType.DN.toString(), 1);
    private DatabaseEntry keyEntry = new DatabaseEntry();
@@ -1650,7 +1645,7 @@
    void flushIndexBuffers() throws InterruptedException, ExecutionException
    {
      final ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
      final ArrayList<Future<Void>> futures = new ArrayList<>();
      Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> it = indexBufferMap.entrySet().iterator();
      while (it.hasNext())
      {
@@ -1744,7 +1739,7 @@
    private final IndexManager indexMgr;
    private final DatabaseEntry dbKey, dbValue;
    private final int cacheSize;
    private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>();
    private final Map<Integer, DNState> dnStateMap = new HashMap<>();
    private final Semaphore permits;
    private final int maxPermits;
    private final AtomicLong bytesRead = new AtomicLong();
@@ -1843,7 +1838,7 @@
      batchNumber.incrementAndGet();
      // Create all the index buffers for the next batch.
      final NavigableSet<IndexInputBuffer> buffers = new TreeSet<IndexInputBuffer>();
      final NavigableSet<IndexInputBuffer> buffers = new TreeSet<>();
      for (int i = 0; i < permitRequest; i++)
      {
        final long bufferBegin = bufferIndexFile.readLong();
@@ -2394,7 +2389,7 @@
    private final DataOutputStream bufferStream;
    private final DataOutputStream bufferIndexStream;
    private final byte[] tmpArray = new byte[8];
    private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<IndexOutputBuffer>();
    private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<>();
    private int insertKeyCount, deleteKeyCount;
    private int bufferCount;
    private boolean poisonSeen;
@@ -2418,7 +2413,7 @@
    public Void call() throws IOException, InterruptedException
    {
      long offset = 0;
      List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>();
      List<IndexOutputBuffer> l = new LinkedList<>();
      try
      {
        while (true)
@@ -2732,8 +2727,7 @@
        {
          indexMgrList.add(indexMgr);
        }
        BlockingQueue<IndexOutputBuffer> newQueue =
            new ArrayBlockingQueue<IndexOutputBuffer>(phaseOneBufferCount);
        BlockingQueue<IndexOutputBuffer> newQueue = new ArrayBlockingQueue<>(phaseOneBufferCount);
        ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQueue, indexMgr);
        scratchFileWriterList.add(indexWriter);
        scratchFileWriterFutures.add(scratchFileWriterService.submit(indexWriter));
@@ -2880,24 +2874,18 @@
    /** Rebuild index configuration. */
    private final RebuildConfig rebuildConfig;
    /** Local DB backend configuration. */
    private final LocalDBBackendCfg cfg;
    /** Map of index keys to indexes. */
    private final Map<IndexKey, Index> indexMap =
        new LinkedHashMap<IndexKey, Index>();
    private final Map<IndexKey, Index> indexMap = new LinkedHashMap<>();
    /** Map of index keys to extensible indexes. */
    private final Map<IndexKey, Collection<Index>> extensibleIndexMap =
        new LinkedHashMap<IndexKey, Collection<Index>>();
    private final Map<IndexKey, Collection<Index>> extensibleIndexMap = new LinkedHashMap<>();
    /** List of VLV indexes. */
    private final List<VLVIndex> vlvIndexes = new LinkedList<VLVIndex>();
    private final List<VLVIndex> vlvIndexes = new LinkedList<>();
    /** The DN2ID index. */
    private DN2ID dn2id;
    /** The DN2URI index. */
    private DN2URI dn2uri;
@@ -3267,7 +3255,7 @@
      scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
      bufferSortService = Executors.newFixedThreadPool(threadCount);
      ExecutorService rebuildIndexService = Executors.newFixedThreadPool(threadCount);
      List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
      List<Callable<Void>> tasks = new ArrayList<>(threadCount);
      for (int i = 0; i < threadCount; i++)
      {
        tasks.add(this);