mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
24.01.2015 9e0e249ed5a9b699b884ec432b5de63c74c4bbb0
Code cleanup

Importer.java:
Used java 7 diamond operator
Reorganized fields to put related fields together
1 files modified
182 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java 182 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -156,17 +156,23 @@
  /** The DN attribute type. */
  private static final AttributeType DN_TYPE;
  /** Root container. */
  private final RootContainer rootContainer;
  /** Import configuration. */
  private final LDIFImportConfig importConfiguration;
  private final ServerContext serverContext;
  /** LDIF reader. */
  private ImportLDIFReader reader;
  /** Phase one buffer count. */
  private final AtomicInteger bufferCount = new AtomicInteger(0);
  /** Phase one imported entries count. */
  private final AtomicLong importCount = new AtomicLong(0);
  /** Migrated entry count. */
  private int migratedCount;
  /** Phase one buffer size in bytes. */
  private int bufferSize;
  /** Temp scratch directory. */
  private final File tempDir;
  /** Index count. */
  private final int indexCount;
  /** Thread count. */
@@ -175,22 +181,10 @@
  /** Set to true when validation is skipped. */
  private final boolean skipDNValidation;
  /** Temp scratch directory. */
  private final File tempDir;
  /** Temporary environment used when DN validation is done in first phase. */
  private final DNCache tmpEnv;
  /** Root container. */
  private final RootContainer rootContainer;
  /** Import configuration. */
  private final LDIFImportConfig importConfiguration;
  private final ServerContext serverContext;
  /** LDIF reader. */
  private ImportLDIFReader reader;
  /** Migrated entry count. */
  private int migratedCount;
  /** Size in bytes of temporary env. */
  private long tmpEnvCacheSize;
  /** Available memory at the start of the import. */
@@ -204,26 +198,24 @@
  private ExecutorService scratchFileWriterService;
  /** Queue of free index buffers -- used to re-cycle index buffers. */
  private final BlockingQueue<IndexOutputBuffer> freeBufferQueue =
      new LinkedBlockingQueue<IndexOutputBuffer>();
  private final BlockingQueue<IndexOutputBuffer> freeBufferQueue = new LinkedBlockingQueue<>();
  /**
   * Map of index keys to index buffers. Used to allocate sorted index buffers
   * to a index writer thread.
   */
  private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap =
      new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
  private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap = new ConcurrentHashMap<>();
  /** Map of DB containers to index managers. Used to start phase 2. */
  private final List<IndexManager> indexMgrList = new LinkedList<IndexManager>();
  private final List<IndexManager> indexMgrList = new LinkedList<>();
  /** Map of DB containers to DN-based index managers. Used to start phase 2. */
  private final List<IndexManager> DNIndexMgrList = new LinkedList<IndexManager>();
  private final List<IndexManager> DNIndexMgrList = new LinkedList<>();
  /**
   * Futures used to indicate when the index file writers are done flushing
   * their work queues and have exited. End of phase one.
   */
  private final List<Future<Void>> scratchFileWriterFutures;
  private final List<Future<Void>> scratchFileWriterFutures = new CopyOnWriteArrayList<>();
  /**
   * List of index file writer tasks. Used to signal stopScratchFileWriters to
   * the index file writer tasks when the LDIF file has been done.
@@ -231,12 +223,11 @@
  private final List<ScratchFileWriterTask> scratchFileWriterList;
  /** Map of DNs to Suffix objects. */
  private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
  private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<>();
  /** Map of indexIDs to database containers. */
  private final ConcurrentHashMap<Integer, Index> indexIDToIndexMap = new ConcurrentHashMap<Integer, Index>();
  private final ConcurrentHashMap<Integer, Index> indexIDToIndexMap = new ConcurrentHashMap<>();
  /** Map of indexIDs to entry containers. */
  private final ConcurrentHashMap<Integer, EntryContainer> indexIDToECMap =
      new ConcurrentHashMap<Integer, EntryContainer>();
  private final ConcurrentHashMap<Integer, EntryContainer> indexIDToECMap = new ConcurrentHashMap<>();
  /** Used to synchronize when a scratch file index writer is first setup. */
  private final Object synObj = new Object();
@@ -283,23 +274,16 @@
    this.rootContainer = rootContainer;
    this.importConfiguration = null;
    this.serverContext = serverContext;
    this.tmpEnv = null;
    this.threadCount = 1;
    this.rebuildManager = new RebuildIndexManager(rootContainer.getStorage(), rebuildConfig, cfg);
    this.indexCount = rebuildManager.getIndexCount();
    this.clearedBackend = false;
    this.scratchFileWriterList =
        new ArrayList<ScratchFileWriterTask>(indexCount);
    this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>();
    this.scratchFileWriterList = new ArrayList<>(indexCount);
    this.tempDir = getTempDir(cfg, rebuildConfig.getTmpDirectory());
    recursiveDelete(tempDir);
    if (!tempDir.exists() && !tempDir.mkdirs())
    {
      throw new InitializationException(ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
    }
    this.tempDir = prepareTempDir(cfg, rebuildConfig.getTmpDirectory());
    computeMemoryRequirements();
    this.skipDNValidation = true;
    initializeDBEnv();
    this.tmpEnv = null;
  }
  /**
@@ -337,19 +321,12 @@
    this.indexCount = getTotalIndexCount(backendCfg);
    this.clearedBackend = mustClearBackend(importConfiguration, backendCfg);
    this.scratchFileWriterList =
        new ArrayList<ScratchFileWriterTask>(indexCount);
    this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>();
    this.scratchFileWriterList = new ArrayList<>(indexCount);
    this.tempDir = getTempDir(backendCfg, importConfiguration.getTmpDirectory());
    recursiveDelete(tempDir);
    if (!tempDir.exists() && !tempDir.mkdirs())
    {
      throw new InitializationException(ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
    }
    this.tempDir = prepareTempDir(backendCfg, importConfiguration.getTmpDirectory());
    computeMemoryRequirements();
    skipDNValidation = importConfiguration.getSkipDNValidation();
    initializeDBEnv();
    // Set up temporary environment.
    if (!skipDNValidation)
    {
@@ -363,6 +340,18 @@
    }
  }
  private File prepareTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) throws InitializationException
  {
    File parentDir = getFileForPath(tmpDirectory != null ? tmpDirectory : DEFAULT_TMP_DIR);
    File tempDir = new File(parentDir, backendCfg.getBackendId());
    recursiveDelete(tempDir);
    if (!tempDir.exists() && !tempDir.mkdirs())
    {
      throw new InitializationException(ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
    }
    return tempDir;
  }
  /**
   * Returns whether the backend must be cleared.
   *
@@ -384,20 +373,6 @@
     */
  }
  private static File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
  {
    File parentDir;
    if (tmpDirectory != null)
    {
      parentDir = getFileForPath(tmpDirectory);
    }
    else
    {
      parentDir = getFileForPath(DEFAULT_TMP_DIR);
    }
    return new File(parentDir, backendCfg.getBackendId());
  }
  private static int getTotalIndexCount(PluggableBackendCfg backendCfg) throws ConfigException
  {
    int indexes = 2; // dn2id, dn2uri
@@ -445,12 +420,12 @@
  }
  /**
   * Calculate buffer sizes and initialize JEB properties based on memory.
   * Calculate buffer sizes and initialize properties based on memory.
   *
   * @throws InitializationException
   *           If a problem occurs during calculation.
   */
  private void initializeDBEnv() throws InitializationException
  private void computeMemoryRequirements() throws InitializationException
  {
    // Calculate amount of usable memory. This will need to take into account
    // various fudge factors, including the number of IO buffers used by the
@@ -462,8 +437,7 @@
    // We need caching when doing DN validation or rebuilding indexes.
    if (!skipDNValidation || rebuildManager != null)
    {
      // No DN validation: calculate memory for DB cache, DN2ID temporary cache,
      // and buffers.
      // No DN validation: calculate memory for DB cache, DN2ID temporary cache, and buffers.
      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
      {
        dbCacheSize = 500 * KB;
@@ -672,8 +646,8 @@
  {
    DN baseDN = entryContainer.getBaseDN();
    EntryContainer sourceEntryContainer = null;
    List<DN> includeBranches = new ArrayList<DN>();
    List<DN> excludeBranches = new ArrayList<DN>();
    List<DN> includeBranches = new ArrayList<>();
    List<DN> excludeBranches = new ArrayList<>();
    if (!importConfiguration.appendToExistingData()
        && !importConfiguration.clearBackend())
@@ -918,11 +892,11 @@
      final long startTime = System.currentTimeMillis();
      importPhaseOne();
      final long phaseOneFinishTime = System.currentTimeMillis();
      if (!skipDNValidation)
      {
        tmpEnv.shutdown();
      }
      if (isCanceled)
      {
        throw new InterruptedException("Import processing canceled.");
@@ -1055,7 +1029,7 @@
    final Storage storage = rootContainer.getStorage();
    execService.submit(new MigrateExistingTask(storage)).get();
    final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
    final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
    if (importConfiguration.appendToExistingData()
        && importConfiguration.replaceExistingEntries())
    {
@@ -1147,7 +1121,7 @@
    int buffers;
    while (true)
    {
      final List<IndexManager> allIndexMgrs = new ArrayList<IndexManager>(DNIndexMgrList);
      final List<IndexManager> allIndexMgrs = new ArrayList<>(DNIndexMgrList);
      allIndexMgrs.addAll(indexMgrList);
      Collections.sort(allIndexMgrs, Collections.reverseOrder());
@@ -1197,7 +1171,7 @@
    Semaphore permits = new Semaphore(buffers);
    // Start DN processing first.
    List<Future<Void>> futures = new LinkedList<Future<Void>>();
    List<Future<Void>> futures = new LinkedList<>();
    submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
    submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
    getAll(futures);
@@ -1366,7 +1340,7 @@
    private List<ByteString> includeBranchesAsBytes(Suffix suffix)
    {
      List<ByteString> includeBranches = new ArrayList<ByteString>(suffix.getIncludeBranches().size());
      List<ByteString> includeBranches = new ArrayList<>(suffix.getIncludeBranches().size());
      for (DN includeBranch : suffix.getIncludeBranches())
      {
        if (includeBranch.isDescendantOf(suffix.getBaseDN()))
@@ -1388,8 +1362,8 @@
      super(storage);
    }
    private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
    private final Set<ByteString> deleteKeySet = new HashSet<ByteString>();
    private final Set<ByteString> insertKeySet = new HashSet<>();
    private final Set<ByteString> deleteKeySet = new HashSet<>();
    private final EntryInformation entryInfo = new EntryInformation();
    private Entry oldEntry;
    private EntryID entryID;
@@ -1426,16 +1400,17 @@
      }
    }
    void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix) throws DirectoryException,
        StorageRuntimeException, InterruptedException
    void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix)
        throws DirectoryException, StorageRuntimeException, InterruptedException
    {
      DN entryDN = entry.getName();
      DN2ID dn2id = suffix.getDN2ID();
      EntryID oldID = dn2id.get(txn, entryDN);
      EntryID oldID = suffix.getDN2ID().get(txn, entryDN);
      if (oldID != null)
      {
        oldEntry = suffix.getID2Entry().get(txn, oldID);
      }
      if (oldEntry == null)
      {
        if (!skipDNValidation && !dnSanityCheck(txn, entryDN, entry, suffix))
@@ -1451,6 +1426,7 @@
        suffix.removePending(entryDN);
        entryID = oldID;
      }
      processDN2URI(txn, suffix, oldEntry, entry);
      suffix.getID2Entry().put(txn, entryID, entry);
      if (oldEntry != null)
@@ -1503,8 +1479,8 @@
  private class ImportTask implements Callable<Void>
  {
    private final Storage storage;
    private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>();
    private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
    private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<>();
    private final Set<ByteString> insertKeySet = new HashSet<>();
    private final EntryInformation entryInfo = new EntryInformation();
    private final IndexKey dnIndexKey = new IndexKey(DN_TYPE, DN2ID_INDEX_NAME, 1);
@@ -1666,7 +1642,7 @@
    void flushIndexBuffers() throws InterruptedException, ExecutionException
    {
      final ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
      final ArrayList<Future<Void>> futures = new ArrayList<>();
      for (IndexOutputBuffer indexBuffer : indexBufferMap.values())
      {
        indexBuffer.discard();
@@ -1758,7 +1734,7 @@
    private final IndexManager indexMgr;
    private final int cacheSize;
    /** indexID => DNState map */
    private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>();
    private final Map<Integer, DNState> dnStateMap = new HashMap<>();
    private final Semaphore permits;
    private final int maxPermits;
    private final AtomicLong bytesRead = new AtomicLong();
@@ -1856,7 +1832,7 @@
      batchNumber.incrementAndGet();
      // Create all the index buffers for the next batch.
      final NavigableSet<IndexInputBuffer> buffers = new TreeSet<IndexInputBuffer>();
      final NavigableSet<IndexInputBuffer> buffers = new TreeSet<>();
      for (int i = 0; i < permitRequest; i++)
      {
        final long bufferBegin = bufferIndexFile.readLong();
@@ -2086,8 +2062,8 @@
      private final EntryContainer entryContainer;
      private final TreeName dn2id;
      private final TreeMap<ByteString, EntryID> parentIDMap = new TreeMap<ByteString, EntryID>();
      private final Map<EntryID, AtomicLong> id2childrenCountTree = new TreeMap<EntryID, AtomicLong>();
      private final TreeMap<ByteString, EntryID> parentIDMap = new TreeMap<>();
      private final Map<EntryID, AtomicLong> id2childrenCountTree = new TreeMap<>();
      private ByteSequence parentDN;
      private final ByteStringBuilder lastDN = new ByteStringBuilder();
      private EntryID parentID, lastID, entryID;
@@ -2228,6 +2204,8 @@
        }
        id2childrenCountTree.clear();
      }
    }
  }
@@ -2248,7 +2226,7 @@
    private final ByteArrayOutputStream deleteByteStream = new ByteArrayOutputStream(2 * bufferSize);
    private final DataOutputStream bufferStream;
    private final DataOutputStream bufferIndexStream;
    private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<IndexOutputBuffer>();
    private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<>();
    private int insertKeyCount, deleteKeyCount;
    private int bufferCount;
    private boolean poisonSeen;
@@ -2272,7 +2250,7 @@
    public Void call() throws IOException, InterruptedException
    {
      long offset = 0;
      List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>();
      List<IndexOutputBuffer> l = new LinkedList<>();
      try
      {
        while (true)
@@ -2548,8 +2526,7 @@
        {
          indexMgrList.add(indexMgr);
        }
        BlockingQueue<IndexOutputBuffer> newQueue =
            new ArrayBlockingQueue<IndexOutputBuffer>(phaseOneBufferCount);
        BlockingQueue<IndexOutputBuffer> newQueue = new ArrayBlockingQueue<>(phaseOneBufferCount);
        ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQueue, indexMgr);
        scratchFileWriterList.add(indexWriter);
        scratchFileWriterFutures.add(scratchFileWriterService.submit(indexWriter));
@@ -2695,16 +2672,18 @@
    /** Rebuild index configuration. */
    private final RebuildConfig rebuildConfig;
    /** Local DB backend configuration. */
    private final PluggableBackendCfg cfg;
    /** Map of index keys to indexes. */
    private final Map<IndexKey, MatchingRuleIndex> indexMap =
        new LinkedHashMap<IndexKey, MatchingRuleIndex>();
    private final Map<IndexKey, MatchingRuleIndex> indexMap = new LinkedHashMap<>();
    /** List of VLV indexes. */
    private final List<VLVIndex> vlvIndexes = new LinkedList<VLVIndex>();
    private final List<VLVIndex> vlvIndexes = new LinkedList<>();
    /** The suffix instance. */
    private Suffix suffix;
    /** The entry container. */
    private EntryContainer entryContainer;
    /** The DN2ID index. */
    private DN2ID dn2id;
    /** The DN2URI index. */
@@ -2715,11 +2694,6 @@
    /** Total entries processed. */
    private final AtomicLong entriesProcessed = new AtomicLong(0);
    /** The suffix instance. */
    private Suffix suffix;
    /** The entry container. */
    private EntryContainer entryContainer;
    /**
     * Create an instance of the rebuild index manager using the specified
     * parameters.
@@ -3017,7 +2991,7 @@
      scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
      bufferSortService = Executors.newFixedThreadPool(threadCount);
      ExecutorService rebuildIndexService = Executors.newFixedThreadPool(threadCount);
      List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
      List<Callable<Void>> tasks = new ArrayList<>(threadCount);
      for (int i = 0; i < threadCount; i++)
      {
        tasks.add(this);
@@ -3699,7 +3673,7 @@
     */
    private TmpEnv(File envPath) throws StorageRuntimeException
    {
      final Map<String, Object> returnValues = new HashMap<String, Object>();
      final Map<String, Object> returnValues = new HashMap<>();
      returnValues.put("getDBDirectory", envPath.getAbsolutePath());
      returnValues.put("getBackendId", DB_NAME);
      returnValues.put("getDBCacheSize", 0L);