mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.20.2015 5d07ec161328a94de355aa4bf93918a2da5a8602
opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
@@ -46,6 +46,8 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -85,7 +87,6 @@
import com.persistit.Persistit;
import com.persistit.Transaction;
import com.persistit.Tree;
import com.persistit.TreeBuilder;
import com.persistit.Value;
import com.persistit.Volume;
import com.persistit.VolumeSpecification;
@@ -247,26 +248,31 @@
  /** PersistIt implementation of the {@link Importer} interface. */
  private final class ImporterImpl implements Importer
  {
    private final TreeBuilder importer = new TreeBuilder(db);
    private final Key importKey = new Key(db);
    private final Value importValue = new Value(db);
    private final Map<TreeName, Tree> trees = new HashMap<TreeName, Tree>();
    private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>();
    private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>()
    {
      @Override
      protected Map<TreeName, Exchange> initialValue()
      {
        final Map<TreeName, Exchange> value = new HashMap<>();
        allExchanges.add(value);
        return value;
      }
    };
    @Override
    public void close()
    {
      try
      for (Map<TreeName, Exchange> map : allExchanges)
      {
        importer.merge();
        for (Exchange exchange : map.values())
        {
          db.releaseExchange(exchange);
        }
        map.clear();
      }
      catch (final Exception e)
      {
        throw new StorageRuntimeException(e);
      }
      finally
      {
        PersistItStorage.this.close();
      }
      PersistItStorage.this.close();
    }
    @Override
@@ -288,16 +294,59 @@
    {
      try
      {
        final Tree tree = trees.get(treeName);
        importer.store(tree,
            bytesToKey(importKey, key),
            bytesToValue(importValue, value));
        final Exchange ex = getExchangeFromCache(treeName);
        bytesToKey(ex.getKey(), key);
        bytesToValue(ex.getValue(), value);
        ex.store();
      }
      catch (final Exception e)
      {
        throw new StorageRuntimeException(e);
      }
    }
    @Override
    public boolean delete(final TreeName treeName, final ByteSequence key)
    {
      try
      {
        final Exchange ex = getExchangeFromCache(treeName);
        bytesToKey(ex.getKey(), key);
        return ex.remove();
      }
      catch (final PersistitException e)
      {
        throw new StorageRuntimeException(e);
      }
    }
    @Override
    public ByteString read(final TreeName treeName, final ByteSequence key)
    {
      try
      {
        final Exchange ex = getExchangeFromCache(treeName);
        bytesToKey(ex.getKey(), key);
        ex.fetch();
        return valueToBytes(ex.getValue());
      }
      catch (final PersistitException e)
      {
        throw new StorageRuntimeException(e);
      }
    }
    private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException
    {
      Map<TreeName, Exchange> threadExchanges = exchanges.get();
      Exchange exchange = threadExchanges.get(treeName);
      if (exchange == null)
      {
        exchange = getNewExchange(treeName, false);
        threadExchanges.put(treeName, exchange);
      }
      return exchange;
    }
  }
  /** PersistIt implementation of the {@link WriteableTransaction} interface. */
@@ -306,8 +355,7 @@
    private final Map<TreeName, Exchange> exchanges = new HashMap<TreeName, Exchange>();
    @Override
    public void put(final TreeName treeName, final ByteSequence key,
        final ByteSequence value)
    public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value)
    {
      try
      {
@@ -475,8 +523,7 @@
      return b1.equals(b2);
    }
    private Exchange getExchangeFromCache(final TreeName treeName)
        throws PersistitException
    private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException
    {
      Exchange exchange = exchanges.get(treeName);
      if (exchange == null)
@@ -495,34 +542,18 @@
      }
      exchanges.clear();
    }
    private Exchange getNewExchange(final TreeName treeName, final boolean create)
        throws PersistitException
    {
      return db.getExchange(volume, mangleTreeName(treeName), create);
    }
  }
  private static void clearAndCreateDbDir(final File dbDir)
  private Exchange getNewExchange(final TreeName treeName, final boolean create) throws PersistitException
  {
    if (dbDir.exists())
    {
      for (final File child : dbDir.listFiles())
      {
        child.delete();
      }
    }
    else
    {
      dbDir.mkdirs();
    }
    return db.getExchange(volume, mangleTreeName(treeName), create);
  }
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private final ServerContext serverContext;
  private File backendDirectory;
  private Persistit db;
  private Volume volume;
  private Configuration dbCfg;
  private PersistitBackendCfg config;
  private DiskSpaceMonitor diskMonitor;
  private MemoryQuota memQuota;
@@ -540,30 +571,36 @@
  // FIXME: should be package private once importer is decoupled.
  public PersistItStorage(final PersistitBackendCfg cfg, ServerContext serverContext) throws ConfigException
  {
    this.serverContext = serverContext;
    backendDirectory = new File(getFileForPath(cfg.getDBDirectory()), cfg.getBackendId());
    config = cfg;
    dbCfg = new Configuration();
    cfg.addPersistitChangeListener(this);
  }
  private Configuration buildConfiguration()
  {
    final Configuration dbCfg = new Configuration();
    dbCfg.setLogFile(new File(backendDirectory, VOLUME_NAME + ".log").getPath());
    dbCfg.setJournalPath(new File(backendDirectory, VOLUME_NAME + "_journal").getPath());
    dbCfg.setVolumeList(asList(new VolumeSpecification(new File(backendDirectory, VOLUME_NAME).getPath(), null,
        BUFFER_SIZE, 4096, Long.MAX_VALUE / BUFFER_SIZE, 2048, true, false, false)));
    final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg();
    final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(dbCfg);
    bufferPoolCfg.setMaximumCount(Integer.MAX_VALUE);
    diskMonitor = serverContext.getDiskSpaceMonitor();
    memQuota = serverContext.getMemoryQuota();
    if (cfg.getDBCacheSize() > 0)
    if (config.getDBCacheSize() > 0)
    {
      bufferPoolCfg.setMaximumMemory(cfg.getDBCacheSize());
      memQuota.acquireMemory(cfg.getDBCacheSize());
      bufferPoolCfg.setMaximumMemory(config.getDBCacheSize());
      memQuota.acquireMemory(config.getDBCacheSize());
    }
    else
    {
      bufferPoolCfg.setFraction(cfg.getDBCachePercent() / 100.0f);
      memQuota.acquireMemory(memQuota.memPercentToBytes(cfg.getDBCachePercent()));
      bufferPoolCfg.setFraction(config.getDBCachePercent() / 100.0f);
      memQuota.acquireMemory(memQuota.memPercentToBytes(config.getDBCachePercent()));
    }
    dbCfg.setCommitPolicy(cfg.isDBTxnNoSync() ? SOFT : GROUP);
    cfg.addPersistitChangeListener(this);
    dbCfg.setCommitPolicy(config.isDBTxnNoSync() ? SOFT : GROUP);
    return dbCfg;
  }
  /** {@inheritDoc} */
@@ -594,21 +631,31 @@
    diskMonitor.deregisterMonitoredDirectory(getDirectory(), this);
  }
  private BufferPoolConfiguration getBufferPoolCfg()
  private static BufferPoolConfiguration getBufferPoolCfg(Configuration dbCfg)
  {
    return dbCfg.getBufferPoolMap().get(BUFFER_SIZE);
  }
  /** {@inheritDoc} */
  @Override
  public void open() throws Exception
  public void open() throws ConfigException, StorageRuntimeException
  {
    open0(buildConfiguration());
  }
  private void open0(final Configuration dbCfg) throws ConfigException
  {
    setupStorageFiles();
    try
    {
      if (db != null)
      {
        throw new IllegalStateException(
            "Database is already open, either the backend is enabled or an import is currently running.");
      }
      db = new Persistit(dbCfg);
      final long bufferCount = getBufferPoolCfg().computeBufferCount(db.getAvailableHeap());
      final long bufferCount = getBufferPoolCfg(dbCfg).computeBufferCount(db.getAvailableHeap());
      final long totalSize = bufferCount * BUFFER_SIZE / 1024;
      logger.info(NOTE_PERSISTIT_MEMORY_CFG, config.getBackendId(),
          bufferCount, BUFFER_SIZE, totalSize);
@@ -676,14 +723,13 @@
  /** {@inheritDoc} */
  @Override
  public Importer startImport() throws Exception
  public Importer startImport() throws ConfigException, StorageRuntimeException
  {
    clearAndCreateDbDir(backendDirectory);
    open();
    open0(buildConfiguration());
    return new ImporterImpl();
  }
  private String mangleTreeName(final TreeName treeName)
  private static String mangleTreeName(final TreeName treeName)
  {
    StringBuilder mangled = new StringBuilder();
    String name = treeName.toString();
@@ -879,19 +925,19 @@
   * TODO: it would be nice to use the low-level key/value APIs. They seem quite
   * inefficient at the moment for simple byte arrays.
   */
  private Key bytesToKey(final Key key, final ByteSequence bytes)
  private static Key bytesToKey(final Key key, final ByteSequence bytes)
  {
    final byte[] tmp = bytes.toByteArray();
    return key.clear().appendByteArray(tmp, 0, tmp.length);
  }
  private Value bytesToValue(final Value value, final ByteSequence bytes)
  private static Value bytesToValue(final Value value, final ByteSequence bytes)
  {
    value.clear().putByteArray(bytes.toByteArray());
    return value;
  }
  private ByteString valueToBytes(final Value value)
  private static ByteString valueToBytes(final Value value)
  {
    if (value.isDefined())
    {