| | |
| | | import java.util.ListIterator; |
| | | import java.util.Map; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.Queue; |
| | | import java.util.concurrent.ConcurrentLinkedDeque; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | |
| | | import com.persistit.Persistit; |
| | | import com.persistit.Transaction; |
| | | import com.persistit.Tree; |
| | | import com.persistit.TreeBuilder; |
| | | import com.persistit.Value; |
| | | import com.persistit.Volume; |
| | | import com.persistit.VolumeSpecification; |
| | |
| | | /** PersistIt implementation of the {@link Importer} interface. */ |
| | | private final class ImporterImpl implements Importer |
| | | { |
| | | private final TreeBuilder importer = new TreeBuilder(db); |
| | | private final Key importKey = new Key(db); |
| | | private final Value importValue = new Value(db); |
| | | private final Map<TreeName, Tree> trees = new HashMap<TreeName, Tree>(); |
| | | private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>(); |
| | | private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>() |
| | | { |
| | | @Override |
| | | protected Map<TreeName, Exchange> initialValue() |
| | | { |
| | | final Map<TreeName, Exchange> value = new HashMap<>(); |
| | | allExchanges.add(value); |
| | | return value; |
| | | } |
| | | }; |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | try |
| | | for (Map<TreeName, Exchange> map : allExchanges) |
| | | { |
| | | importer.merge(); |
| | | for (Exchange exchange : map.values()) |
| | | { |
| | | db.releaseExchange(exchange); |
| | | } |
| | | map.clear(); |
| | | } |
| | | catch (final Exception e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | finally |
| | | { |
| | | PersistItStorage.this.close(); |
| | | } |
| | | PersistItStorage.this.close(); |
| | | } |
| | | |
| | | @Override |
| | |
| | | { |
| | | try |
| | | { |
| | | final Tree tree = trees.get(treeName); |
| | | importer.store(tree, |
| | | bytesToKey(importKey, key), |
| | | bytesToValue(importValue, value)); |
| | | final Exchange ex = getExchangeFromCache(treeName); |
| | | bytesToKey(ex.getKey(), key); |
| | | bytesToValue(ex.getValue(), value); |
| | | ex.store(); |
| | | } |
| | | catch (final Exception e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean delete(final TreeName treeName, final ByteSequence key) |
| | | { |
| | | try |
| | | { |
| | | final Exchange ex = getExchangeFromCache(treeName); |
| | | bytesToKey(ex.getKey(), key); |
| | | return ex.remove(); |
| | | } |
| | | catch (final PersistitException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public ByteString read(final TreeName treeName, final ByteSequence key) |
| | | { |
| | | try |
| | | { |
| | | final Exchange ex = getExchangeFromCache(treeName); |
| | | bytesToKey(ex.getKey(), key); |
| | | ex.fetch(); |
| | | return valueToBytes(ex.getValue()); |
| | | } |
| | | catch (final PersistitException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException |
| | | { |
| | | Map<TreeName, Exchange> threadExchanges = exchanges.get(); |
| | | Exchange exchange = threadExchanges.get(treeName); |
| | | if (exchange == null) |
| | | { |
| | | exchange = getNewExchange(treeName, false); |
| | | threadExchanges.put(treeName, exchange); |
| | | } |
| | | return exchange; |
| | | } |
| | | } |
| | | |
| | | /** PersistIt implementation of the {@link WriteableTransaction} interface. */ |
| | |
| | | private final Map<TreeName, Exchange> exchanges = new HashMap<TreeName, Exchange>(); |
| | | |
| | | @Override |
| | | public void put(final TreeName treeName, final ByteSequence key, |
| | | final ByteSequence value) |
| | | public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value) |
| | | { |
| | | try |
| | | { |
| | |
| | | return b1.equals(b2); |
| | | } |
| | | |
| | | private Exchange getExchangeFromCache(final TreeName treeName) |
| | | throws PersistitException |
| | | private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException |
| | | { |
| | | Exchange exchange = exchanges.get(treeName); |
| | | if (exchange == null) |
| | |
| | | } |
| | | exchanges.clear(); |
| | | } |
| | | |
| | | private Exchange getNewExchange(final TreeName treeName, final boolean create) |
| | | throws PersistitException |
| | | { |
| | | return db.getExchange(volume, mangleTreeName(treeName), create); |
| | | } |
| | | } |
| | | |
| | | private static void clearAndCreateDbDir(final File dbDir) |
| | | private Exchange getNewExchange(final TreeName treeName, final boolean create) throws PersistitException |
| | | { |
| | | if (dbDir.exists()) |
| | | { |
| | | for (final File child : dbDir.listFiles()) |
| | | { |
| | | child.delete(); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | dbDir.mkdirs(); |
| | | } |
| | | return db.getExchange(volume, mangleTreeName(treeName), create); |
| | | } |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | private final ServerContext serverContext; |
| | | private File backendDirectory; |
| | | private Persistit db; |
| | | private Volume volume; |
| | | private Configuration dbCfg; |
| | | private PersistitBackendCfg config; |
| | | private DiskSpaceMonitor diskMonitor; |
| | | private MemoryQuota memQuota; |
| | |
| | | // FIXME: should be package private once importer is decoupled. |
| | | public PersistItStorage(final PersistitBackendCfg cfg, ServerContext serverContext) throws ConfigException |
| | | { |
| | | this.serverContext = serverContext; |
| | | backendDirectory = new File(getFileForPath(cfg.getDBDirectory()), cfg.getBackendId()); |
| | | config = cfg; |
| | | dbCfg = new Configuration(); |
| | | cfg.addPersistitChangeListener(this); |
| | | } |
| | | |
| | | private Configuration buildConfiguration() |
| | | { |
| | | final Configuration dbCfg = new Configuration(); |
| | | dbCfg.setLogFile(new File(backendDirectory, VOLUME_NAME + ".log").getPath()); |
| | | dbCfg.setJournalPath(new File(backendDirectory, VOLUME_NAME + "_journal").getPath()); |
| | | dbCfg.setVolumeList(asList(new VolumeSpecification(new File(backendDirectory, VOLUME_NAME).getPath(), null, |
| | | BUFFER_SIZE, 4096, Long.MAX_VALUE / BUFFER_SIZE, 2048, true, false, false))); |
| | | final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(); |
| | | final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(dbCfg); |
| | | bufferPoolCfg.setMaximumCount(Integer.MAX_VALUE); |
| | | |
| | | diskMonitor = serverContext.getDiskSpaceMonitor(); |
| | | memQuota = serverContext.getMemoryQuota(); |
| | | if (cfg.getDBCacheSize() > 0) |
| | | if (config.getDBCacheSize() > 0) |
| | | { |
| | | bufferPoolCfg.setMaximumMemory(cfg.getDBCacheSize()); |
| | | memQuota.acquireMemory(cfg.getDBCacheSize()); |
| | | bufferPoolCfg.setMaximumMemory(config.getDBCacheSize()); |
| | | memQuota.acquireMemory(config.getDBCacheSize()); |
| | | } |
| | | else |
| | | { |
| | | bufferPoolCfg.setFraction(cfg.getDBCachePercent() / 100.0f); |
| | | memQuota.acquireMemory(memQuota.memPercentToBytes(cfg.getDBCachePercent())); |
| | | bufferPoolCfg.setFraction(config.getDBCachePercent() / 100.0f); |
| | | memQuota.acquireMemory(memQuota.memPercentToBytes(config.getDBCachePercent())); |
| | | } |
| | | dbCfg.setCommitPolicy(cfg.isDBTxnNoSync() ? SOFT : GROUP); |
| | | cfg.addPersistitChangeListener(this); |
| | | dbCfg.setCommitPolicy(config.isDBTxnNoSync() ? SOFT : GROUP); |
| | | return dbCfg; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | diskMonitor.deregisterMonitoredDirectory(getDirectory(), this); |
| | | } |
| | | |
| | | private BufferPoolConfiguration getBufferPoolCfg() |
| | | private static BufferPoolConfiguration getBufferPoolCfg(Configuration dbCfg) |
| | | { |
| | | return dbCfg.getBufferPoolMap().get(BUFFER_SIZE); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void open() throws Exception |
| | | public void open() throws ConfigException, StorageRuntimeException |
| | | { |
| | | open0(buildConfiguration()); |
| | | } |
| | | |
| | | private void open0(final Configuration dbCfg) throws ConfigException |
| | | { |
| | | setupStorageFiles(); |
| | | try |
| | | { |
| | | if (db != null) |
| | | { |
| | | throw new IllegalStateException( |
| | | "Database is already open, either the backend is enabled or an import is currently running."); |
| | | } |
| | | db = new Persistit(dbCfg); |
| | | |
| | | final long bufferCount = getBufferPoolCfg().computeBufferCount(db.getAvailableHeap()); |
| | | final long bufferCount = getBufferPoolCfg(dbCfg).computeBufferCount(db.getAvailableHeap()); |
| | | final long totalSize = bufferCount * BUFFER_SIZE / 1024; |
| | | logger.info(NOTE_PERSISTIT_MEMORY_CFG, config.getBackendId(), |
| | | bufferCount, BUFFER_SIZE, totalSize); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Importer startImport() throws Exception |
| | | public Importer startImport() throws ConfigException, StorageRuntimeException |
| | | { |
| | | clearAndCreateDbDir(backendDirectory); |
| | | open(); |
| | | open0(buildConfiguration()); |
| | | return new ImporterImpl(); |
| | | } |
| | | |
| | | private String mangleTreeName(final TreeName treeName) |
| | | private static String mangleTreeName(final TreeName treeName) |
| | | { |
| | | StringBuilder mangled = new StringBuilder(); |
| | | String name = treeName.toString(); |
| | |
| | | * TODO: it would be nice to use the low-level key/value APIs. They seem quite |
| | | * inefficient at the moment for simple byte arrays. |
| | | */ |
| | | private Key bytesToKey(final Key key, final ByteSequence bytes) |
| | | private static Key bytesToKey(final Key key, final ByteSequence bytes) |
| | | { |
| | | final byte[] tmp = bytes.toByteArray(); |
| | | return key.clear().appendByteArray(tmp, 0, tmp.length); |
| | | } |
| | | |
| | | private Value bytesToValue(final Value value, final ByteSequence bytes) |
| | | private static Value bytesToValue(final Value value, final ByteSequence bytes) |
| | | { |
| | | value.clear().putByteArray(bytes.toByteArray()); |
| | | return value; |
| | | } |
| | | |
| | | private ByteString valueToBytes(final Value value) |
| | | private static ByteString valueToBytes(final Value value) |
| | | { |
| | | if (value.isDefined()) |
| | | { |