mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
09.07.2015 641e89ef0e15c9edde69f3b8cf82c7dd5f68687a
opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
@@ -27,7 +27,6 @@
import static com.persistit.Transaction.CommitPolicy.*;
import static java.util.Arrays.*;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.UtilityMessages.*;
@@ -69,6 +68,7 @@
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadOnlyStorageException;
import org.opends.server.backends.pluggable.spi.ReadOperation;
import org.opends.server.backends.pluggable.spi.SequentialCursor;
import org.opends.server.backends.pluggable.spi.Storage;
import org.opends.server.backends.pluggable.spi.StorageInUseException;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
@@ -94,7 +94,6 @@
import com.persistit.Key;
import com.persistit.Persistit;
import com.persistit.Transaction;
import com.persistit.Tree;
import com.persistit.Value;
import com.persistit.Volume;
import com.persistit.VolumeSpecification;
@@ -102,12 +101,15 @@
import com.persistit.exception.PersistitException;
import com.persistit.exception.RollbackException;
import com.persistit.exception.TreeNotFoundException;
import com.persistit.mxbeans.CheckpointManagerMXBean;
/** PersistIt database implementation of the {@link Storage} engine. */
@SuppressWarnings("javadoc")
public final class PDBStorage implements Storage, Backupable, ConfigurationChangeListener<PDBBackendCfg>,
  DiskSpaceMonitorHandler
{
  private static final int IMPORT_DB_CACHE_SIZE = 4 * MB;
  private static final double MAX_SLEEP_ON_RETRY_MS = 50.0;
  private static final String VOLUME_NAME = "dj";
@@ -263,7 +265,6 @@
  /** PersistIt implementation of the {@link Importer} interface. */
  private final class ImporterImpl implements Importer
  {
    private final Map<TreeName, Tree> trees = new HashMap<>();
    private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>();
    private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>()
    {
@@ -291,17 +292,60 @@
    }
    @Override
    public void createTree(final TreeName treeName)
    public void clearTree(final TreeName treeName)
    {
      final Transaction txn = db.getTransaction();
      deleteTree(txn, treeName);
      createTree(txn, treeName);
    }
    private void createTree(final Transaction txn, final TreeName treeName)
    {
      Exchange ex = null;
      try
      {
        final Tree tree = volume.getTree(mangleTreeName(treeName), true);
        trees.put(treeName, tree);
        txn.begin();
        ex = getNewExchange(treeName, true);
        txn.commit();
      }
      catch (final PersistitException e)
      catch (PersistitException e)
      {
        throw new StorageRuntimeException(e);
      }
      finally
      {
        txn.end();
        releaseExchangeSilenty(ex);
      }
    }
    private void deleteTree(Transaction txn, final TreeName treeName)
    {
      Exchange ex = null;
      try
      {
        txn.begin();
        ex = getNewExchange(treeName, true);
        ex.removeTree();
        txn.commit();
      }
      catch (PersistitException e)
      {
        throw new StorageRuntimeException(e);
      }
      finally
      {
        txn.end();
        releaseExchangeSilenty(ex);
      }
    }
    private void releaseExchangeSilenty(Exchange ex)
    {
      if ( ex != null)
      {
        db.releaseExchange(ex);
      }
    }
    @Override
@@ -321,21 +365,6 @@
    }
    @Override
    public boolean delete(final TreeName treeName, final ByteSequence key)
    {
      try
      {
        final Exchange ex = getExchangeFromCache(treeName);
        bytesToKey(ex.getKey(), key);
        return ex.remove();
      }
      catch (final PersistitException e)
      {
        throw new StorageRuntimeException(e);
      }
    }
    @Override
    public ByteString read(final TreeName treeName, final ByteSequence key)
    {
      try
@@ -362,6 +391,19 @@
      }
      return exchange;
    }
    @Override
    public SequentialCursor<ByteString, ByteString> openCursor(TreeName treeName)
    {
      try
      {
        return new CursorImpl(getNewExchange(treeName, false));
      }
      catch (PersistitException e)
      {
        throw new StorageRuntimeException(e);
      }
    }
  }
  /** Common interface for internal WriteableTransaction implementations. */
@@ -430,8 +472,7 @@
    public long getRecordCount(TreeName treeName)
    {
      // FIXME: is there a better/quicker way to do this?
      final Cursor<?, ?> cursor = openCursor(treeName);
      try
      try(final Cursor<?, ?> cursor = openCursor(treeName))
      {
        long count = 0;
        while (cursor.next())
@@ -440,10 +481,6 @@
        }
        return count;
      }
      finally
      {
        cursor.close();
      }
    }
    @Override
@@ -501,12 +538,6 @@
    }
    @Override
    public void renameTree(final TreeName oldTreeName, final TreeName newTreeName)
    {
      throw new UnsupportedOperationException();
    }
    @Override
    public boolean update(final TreeName treeName, final ByteSequence key, final UpdateFunction f)
    {
      try
@@ -642,12 +673,6 @@
    }
    @Override
    public void renameTree(TreeName oldName, TreeName newName)
    {
      throw new ReadOnlyStorageException();
    }
    @Override
    public void deleteTree(TreeName name)
    {
      throw new ReadOnlyStorageException();
@@ -712,6 +737,15 @@
    cfg.addPDBChangeListener(this);
  }
  private Configuration buildImportConfiguration()
  {
    final Configuration dbCfg = buildConfiguration(AccessMode.READ_WRITE);
    getBufferPoolCfg(dbCfg).setMaximumMemory(IMPORT_DB_CACHE_SIZE);
    dbCfg.setCheckpointInterval(CheckpointManagerMXBean.MAXIMUM_CHECKPOINT_INTERVAL_S);
    dbCfg.setCommitPolicy(SOFT);
    return dbCfg;
  }
  private Configuration buildConfiguration(AccessMode accessMode)
  {
    this.accessMode = accessMode;
@@ -867,7 +901,7 @@
  @Override
  public Importer startImport() throws ConfigException, StorageRuntimeException
  {
    open0(buildConfiguration(AccessMode.READ_WRITE));
    open0(buildImportConfiguration());
    return new ImporterImpl();
  }