mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
16.49.2015 c1234a77530eb2c68c06c62c9a69f899f0e4a6e6
Storage's transaction must be created and used in the same thread (CR-6665)
3 files modified
133 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java 2 ●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java 129 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java 2 ●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
@@ -733,7 +733,7 @@
      {
        rootContainer = initializeRootContainer();
      }
      new Importer(rebuildConfig, cfg, serverContext).rebuildIndexes(rootContainer);
      new Importer(rootContainer, rebuildConfig, cfg, serverContext).rebuildIndexes();
    }
    catch (ExecutionException execEx)
    {
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -187,7 +187,7 @@
  private final TmpEnv tmpEnv;
  /** Root container. */
  private RootContainer rootContainer;
  private final RootContainer rootContainer;
  /** Import configuration. */
  private final LDIFImportConfig importConfiguration;
@@ -286,9 +286,10 @@
   * @throws ConfigException
   *           If a problem occurs during initialization.
   */
  Importer(RebuildConfig rebuildConfig, PluggableBackendCfg cfg, ServerContext serverContext)
      throws InitializationException, StorageRuntimeException, ConfigException
  Importer(RootContainer rootContainer, RebuildConfig rebuildConfig, PluggableBackendCfg cfg,
      ServerContext serverContext) throws InitializationException, StorageRuntimeException, ConfigException
  {
    this.rootContainer = rootContainer;
    this.importConfiguration = null;
    this.serverContext = serverContext;
    this.tmpEnv = null;
@@ -324,9 +325,10 @@
   * @throws StorageRuntimeException
   *           If an error occurred when opening the DB.
   */
  Importer(LDIFImportConfig importConfiguration, PluggableBackendCfg backendCfg, ServerContext serverContext)
      throws InitializationException, ConfigException, StorageRuntimeException
  Importer(RootContainer rootContainer, LDIFImportConfig importConfiguration, PluggableBackendCfg backendCfg,
      ServerContext serverContext) throws InitializationException, ConfigException, StorageRuntimeException
  {
    this.rootContainer = rootContainer;
    this.rebuildManager = null;
    this.importConfiguration = importConfiguration;
    this.serverContext = serverContext;
@@ -822,12 +824,9 @@
   * @throws ExecutionException
   *           If an execution error occurred.
   */
  public void rebuildIndexes(RootContainer rootContainer)
      throws ConfigException, InitializationException, StorageRuntimeException,
  public void rebuildIndexes() throws ConfigException, InitializationException, StorageRuntimeException,
      InterruptedException, ExecutionException
  {
    this.rootContainer = rootContainer;
    try
    {
      if (rebuildManager.rebuildConfig.isClearDegradedState())
@@ -836,7 +835,7 @@
      }
      else
      {
        rebuildIndexes();
        doRebuildIndexes();
      }
    }
    catch (Exception e)
@@ -862,7 +861,7 @@
    });
  }
  private void rebuildIndexes() throws Exception
  private void doRebuildIndexes() throws Exception
  {
    final long startTime = System.currentTimeMillis();
    final Storage storage = rootContainer.getStorage();
@@ -902,9 +901,8 @@
   * @throws Exception
   *           If the import failed
   */
  public LDIFImportResult processImport(RootContainer rootContainer) throws Exception
  public LDIFImportResult processImport() throws Exception
  {
    this.rootContainer = rootContainer;
    try {
      try
      {
@@ -1070,14 +1068,7 @@
    final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
    final Storage storage = rootContainer.getStorage();
    storage.write(new WriteOperation()
    {
      @Override
      public void run(WriteableTransaction txn) throws Exception
      {
        execService.submit(new MigrateExistingTask(txn)).get();
      }
    });
    execService.submit(new MigrateExistingTask(storage)).get();
    final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
    if (importConfiguration.appendToExistingData()
@@ -1085,27 +1076,20 @@
    {
      for (int i = 0; i < threadCount; i++)
      {
        tasks.add(new AppendReplaceTask(storage.getWriteableTransaction()));
        tasks.add(new AppendReplaceTask(storage));
      }
    }
    else
    {
      for (int i = 0; i < threadCount; i++)
      {
        tasks.add(new ImportTask(storage.getWriteableTransaction()));
        tasks.add(new ImportTask(storage));
      }
    }
    execService.invokeAll(tasks);
    tasks.clear();
    storage.write(new WriteOperation()
    {
      @Override
      public void run(WriteableTransaction txn) throws Exception
      {
        execService.submit(new MigrateExcludedTask(txn)).get();
      }
    });
    execService.submit(new MigrateExcludedTask(storage)).get();
    stopScratchFileWriters();
    getAll(scratchFileWriterFutures);
@@ -1267,14 +1251,14 @@
  /** Task used to migrate excluded branch. */
  private final class MigrateExcludedTask extends ImportTask
  {
    private MigrateExcludedTask(final WriteableTransaction txn)
    private MigrateExcludedTask(final Storage storage)
    {
      super(txn);
      super(storage);
    }
    /** {@inheritDoc} */
    @Override
    public Void call() throws Exception
    Void call0(WriteableTransaction txn) throws Exception
    {
      for (Suffix suffix : dnSuffixMap.values())
      {
@@ -1304,7 +1288,7 @@
                {
                  EntryID id = new EntryID(cursor.getValue());
                  Entry entry = entryContainer.getID2Entry().get(txn, id);
                  processEntry(entry, rootContainer.getNextEntryID(), suffix);
                  processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
                  migratedCount++;
                  success = cursor.next();
                }
@@ -1331,14 +1315,14 @@
  /** Task to migrate existing entries. */
  private final class MigrateExistingTask extends ImportTask
  {
    private MigrateExistingTask(final WriteableTransaction txn)
    private MigrateExistingTask(final Storage storage)
    {
      super(txn);
      super(storage);
    }
    /** {@inheritDoc} */
    @Override
    public Void call() throws Exception
    Void call0(WriteableTransaction txn) throws Exception
    {
      for (Suffix suffix : dnSuffixMap.values())
      {
@@ -1360,7 +1344,7 @@
              {
                EntryID id = new EntryID(key);
                Entry entry = entryContainer.getID2Entry().get(txn, id);
                processEntry(entry, rootContainer.getNextEntryID(), suffix);
                processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
                migratedCount++;
                success = cursor.next();
              }
@@ -1420,9 +1404,9 @@
   */
  private class AppendReplaceTask extends ImportTask
  {
    public AppendReplaceTask(final WriteableTransaction txn)
    public AppendReplaceTask(final Storage storage)
    {
      super(txn);
      super(storage);
    }
    private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
@@ -1433,7 +1417,7 @@
    /** {@inheritDoc} */
    @Override
    public Void call() throws Exception
    Void call0(WriteableTransaction txn) throws Exception
    {
      try
      {
@@ -1452,7 +1436,7 @@
          }
          entryID = entryInfo.getEntryID();
          Suffix suffix = entryInfo.getSuffix();
          processEntry(entry, suffix);
          processEntry(txn, entry, suffix);
        }
        flushIndexBuffers();
        return null;
@@ -1469,7 +1453,7 @@
      }
    }
    void processEntry(Entry entry, Suffix suffix)
    void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix)
        throws DirectoryException, StorageRuntimeException, InterruptedException
    {
      DN entryDN = entry.getName();
@@ -1481,7 +1465,7 @@
      }
      if (oldEntry == null)
      {
        if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
        if (!skipDNValidation && !dnSanityCheck(txn, entryDN, entry, suffix))
        {
          suffix.removePending(entryDN);
          return;
@@ -1494,7 +1478,7 @@
        suffix.removePending(entryDN);
        entryID = oldID;
      }
      processDN2URI(suffix, oldEntry, entry);
      processDN2URI(txn, suffix, oldEntry, entry);
      suffix.getID2Entry().put(txn, entryID, entry);
      if (oldEntry != null)
      {
@@ -1504,7 +1488,7 @@
      {
        processIndexes(suffix, entry, entryID);
      }
      processVLVIndexes(suffix, entry, entryID);
      processVLVIndexes(txn, suffix, entry, entryID);
      importCount.getAndIncrement();
    }
@@ -1545,21 +1529,33 @@
   */
  private class ImportTask implements Callable<Void>
  {
    WriteableTransaction txn;
    private final Storage storage;
    private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>();
    private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
    private final EntryInformation entryInfo = new EntryInformation();
    private final IndexKey dnIndexKey = new IndexKey(DN_TYPE, DN2ID, 1);
    public ImportTask(final WriteableTransaction txn)
    public ImportTask(final Storage storage)
    {
      this.txn = txn;
      this.storage = storage;
    }
    /** {@inheritDoc} */
    @Override
    public Void call() throws Exception
    public final Void call() throws Exception
    {
      storage.write(new WriteOperation()
      {
        @Override
        public void run(WriteableTransaction txn) throws Exception
        {
          call0(txn);
        }
      });
      return null;
    }
    Void call0(WriteableTransaction txn) throws Exception {
      try
      {
        while (true)
@@ -1576,7 +1572,7 @@
          }
          EntryID entryID = entryInfo.getEntryID();
          Suffix suffix = entryInfo.getSuffix();
          processEntry(entry, entryID, suffix);
          processEntry(txn, entry, entryID, suffix);
        }
        flushIndexBuffers();
        return null;
@@ -1593,26 +1589,26 @@
      }
    }
    void processEntry(Entry entry, EntryID entryID, Suffix suffix)
    void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID, Suffix suffix)
        throws DirectoryException, StorageRuntimeException, InterruptedException
    {
      DN entryDN = entry.getName();
      if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
      if (!skipDNValidation && !dnSanityCheck(txn, entryDN, entry, suffix))
      {
        suffix.removePending(entryDN);
        return;
      }
      suffix.removePending(entryDN);
      processDN2ID(suffix, entryDN, entryID);
      processDN2URI(suffix, null, entry);
      processDN2URI(txn, suffix, null, entry);
      processIndexes(suffix, entry, entryID);
      processVLVIndexes(suffix, entry, entryID);
      processVLVIndexes(txn, suffix, entry, entryID);
      suffix.getID2Entry().put(txn, entryID, entry);
      importCount.getAndIncrement();
    }
    /** Examine the DN for duplicates and missing parents. */
    boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
    boolean dnSanityCheck(WriteableTransaction txn, DN entryDN, Entry entry, Suffix suffix)
        throws StorageRuntimeException, InterruptedException
    {
      //Perform parent checking.
@@ -1666,7 +1662,8 @@
      }
    }
    void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID) throws DirectoryException
    void processVLVIndexes(WriteableTransaction txn, Suffix suffix, Entry entry, EntryID entryID)
        throws DirectoryException
    {
      final EntryContainer entryContainer = suffix.getEntryContainer();
      final IndexBuffer buffer = new IndexBuffer(entryContainer);
@@ -1766,7 +1763,8 @@
      indexIDToECMap.putIfAbsent(indexID, suffix.getEntryContainer());
    }
    void processDN2URI(Suffix suffix, Entry oldEntry, Entry newEntry) throws StorageRuntimeException
    void processDN2URI(WriteableTransaction txn, Suffix suffix, Entry oldEntry, Entry newEntry)
        throws StorageRuntimeException
    {
      DN2URI dn2uri = suffix.getDN2URI();
      if (oldEntry != null)
@@ -2850,7 +2848,6 @@
     */
    void printStartMessage(WriteableTransaction txn) throws StorageRuntimeException
    {
      this.txn = txn;
      totalEntries = suffix.getID2Entry().getRecordCount(txn);
      switch (rebuildConfig.getRebuildMode())
@@ -2896,7 +2893,7 @@
    /** {@inheritDoc} */
    @Override
    public Void call() throws Exception
    Void call0(WriteableTransaction txn) throws Exception
    {
      ID2Entry id2entry = entryContainer.getID2Entry();
      Cursor<ByteString, ByteString> cursor = txn.openCursor(id2entry.getName());
@@ -2912,7 +2909,7 @@
          Entry entry =
              ID2Entry.entryFromDatabase(cursor.getValue(),
                  entryContainer.getRootContainer().getCompressedSchema());
          processEntry(entry, entryID);
          processEntry(txn, entry, entryID);
          entriesProcessed.getAndIncrement();
        }
        flushIndexBuffers();
@@ -3339,7 +3336,7 @@
      return result;
    }
    private void processEntry(Entry entry, EntryID entryID)
    private void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID)
        throws DirectoryException, StorageRuntimeException, InterruptedException
    {
      if (dn2id != null)
@@ -3348,13 +3345,13 @@
      }
      if (dn2uri != null)
      {
        processDN2URI(suffix, null, entry);
        processDN2URI(txn, suffix, null, entry);
      }
      processIndexes(entry, entryID);
      processVLVIndexes(entry, entryID);
      processVLVIndexes(txn, entry, entryID);
    }
    private void processVLVIndexes(Entry entry, EntryID entryID)
    private void processVLVIndexes(WriteableTransaction txn, Entry entry, EntryID entryID)
        throws StorageRuntimeException, DirectoryException
    {
      final IndexBuffer buffer = new IndexBuffer(entryContainer);
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
@@ -318,7 +318,7 @@
  {
    try
    {
      return new Importer(importConfig, config, serverContext).processImport(this);
      return new Importer(this, importConfig, config, serverContext).processImport();
    }
    catch (DirectoryException e)
    {