From 5d07ec161328a94de355aa4bf93918a2da5a8602 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 30 Apr 2015 14:20:06 +0000
Subject: [PATCH] OPENDJ-1801 (CR-6815) Revise usage of storage.open() and startImport()
---
opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java | 166 +++++++++++++++++++++++++++++++++++--------------------
1 files changed, 106 insertions(+), 60 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
index ade7349..c1e9d80 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
@@ -46,6 +46,8 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -85,7 +87,6 @@
import com.persistit.Persistit;
import com.persistit.Transaction;
import com.persistit.Tree;
-import com.persistit.TreeBuilder;
import com.persistit.Value;
import com.persistit.Volume;
import com.persistit.VolumeSpecification;
@@ -247,26 +248,31 @@
/** PersistIt implementation of the {@link Importer} interface. */
private final class ImporterImpl implements Importer
{
- private final TreeBuilder importer = new TreeBuilder(db);
- private final Key importKey = new Key(db);
- private final Value importValue = new Value(db);
private final Map<TreeName, Tree> trees = new HashMap<TreeName, Tree>();
+ private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>();
+ private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>()
+ {
+ @Override
+ protected Map<TreeName, Exchange> initialValue()
+ {
+ final Map<TreeName, Exchange> value = new HashMap<>();
+ allExchanges.add(value);
+ return value;
+ }
+ };
@Override
public void close()
{
- try
+ for (Map<TreeName, Exchange> map : allExchanges)
{
- importer.merge();
+ for (Exchange exchange : map.values())
+ {
+ db.releaseExchange(exchange);
+ }
+ map.clear();
}
- catch (final Exception e)
- {
- throw new StorageRuntimeException(e);
- }
- finally
- {
- PersistItStorage.this.close();
- }
+ PersistItStorage.this.close();
}
@Override
@@ -288,16 +294,59 @@
{
try
{
- final Tree tree = trees.get(treeName);
- importer.store(tree,
- bytesToKey(importKey, key),
- bytesToValue(importValue, value));
+ final Exchange ex = getExchangeFromCache(treeName);
+ bytesToKey(ex.getKey(), key);
+ bytesToValue(ex.getValue(), value);
+ ex.store();
}
catch (final Exception e)
{
throw new StorageRuntimeException(e);
}
}
+
+ @Override
+ public boolean delete(final TreeName treeName, final ByteSequence key)
+ {
+ try
+ {
+ final Exchange ex = getExchangeFromCache(treeName);
+ bytesToKey(ex.getKey(), key);
+ return ex.remove();
+ }
+ catch (final PersistitException e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ }
+
+ @Override
+ public ByteString read(final TreeName treeName, final ByteSequence key)
+ {
+ try
+ {
+ final Exchange ex = getExchangeFromCache(treeName);
+ bytesToKey(ex.getKey(), key);
+ ex.fetch();
+ return valueToBytes(ex.getValue());
+ }
+ catch (final PersistitException e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ }
+
+ private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException
+ {
+ Map<TreeName, Exchange> threadExchanges = exchanges.get();
+ Exchange exchange = threadExchanges.get(treeName);
+ if (exchange == null)
+ {
+ exchange = getNewExchange(treeName, false);
+ threadExchanges.put(treeName, exchange);
+ }
+ return exchange;
+ }
}
/** PersistIt implementation of the {@link WriteableTransaction} interface. */
@@ -306,8 +355,7 @@
private final Map<TreeName, Exchange> exchanges = new HashMap<TreeName, Exchange>();
@Override
- public void put(final TreeName treeName, final ByteSequence key,
- final ByteSequence value)
+ public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value)
{
try
{
@@ -475,8 +523,7 @@
return b1.equals(b2);
}
- private Exchange getExchangeFromCache(final TreeName treeName)
- throws PersistitException
+ private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException
{
Exchange exchange = exchanges.get(treeName);
if (exchange == null)
@@ -495,34 +542,18 @@
}
exchanges.clear();
}
-
- private Exchange getNewExchange(final TreeName treeName, final boolean create)
- throws PersistitException
- {
- return db.getExchange(volume, mangleTreeName(treeName), create);
- }
}
- private static void clearAndCreateDbDir(final File dbDir)
+ private Exchange getNewExchange(final TreeName treeName, final boolean create) throws PersistitException
{
- if (dbDir.exists())
- {
- for (final File child : dbDir.listFiles())
- {
- child.delete();
- }
- }
- else
- {
- dbDir.mkdirs();
- }
+ return db.getExchange(volume, mangleTreeName(treeName), create);
}
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+ private final ServerContext serverContext;
private File backendDirectory;
private Persistit db;
private Volume volume;
- private Configuration dbCfg;
private PersistitBackendCfg config;
private DiskSpaceMonitor diskMonitor;
private MemoryQuota memQuota;
@@ -540,30 +571,36 @@
// FIXME: should be package private once importer is decoupled.
public PersistItStorage(final PersistitBackendCfg cfg, ServerContext serverContext) throws ConfigException
{
+ this.serverContext = serverContext;
backendDirectory = new File(getFileForPath(cfg.getDBDirectory()), cfg.getBackendId());
config = cfg;
- dbCfg = new Configuration();
+ cfg.addPersistitChangeListener(this);
+ }
+
+ private Configuration buildConfiguration()
+ {
+ final Configuration dbCfg = new Configuration();
dbCfg.setLogFile(new File(backendDirectory, VOLUME_NAME + ".log").getPath());
dbCfg.setJournalPath(new File(backendDirectory, VOLUME_NAME + "_journal").getPath());
dbCfg.setVolumeList(asList(new VolumeSpecification(new File(backendDirectory, VOLUME_NAME).getPath(), null,
BUFFER_SIZE, 4096, Long.MAX_VALUE / BUFFER_SIZE, 2048, true, false, false)));
- final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg();
+ final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(dbCfg);
bufferPoolCfg.setMaximumCount(Integer.MAX_VALUE);
diskMonitor = serverContext.getDiskSpaceMonitor();
memQuota = serverContext.getMemoryQuota();
- if (cfg.getDBCacheSize() > 0)
+ if (config.getDBCacheSize() > 0)
{
- bufferPoolCfg.setMaximumMemory(cfg.getDBCacheSize());
- memQuota.acquireMemory(cfg.getDBCacheSize());
+ bufferPoolCfg.setMaximumMemory(config.getDBCacheSize());
+ memQuota.acquireMemory(config.getDBCacheSize());
}
else
{
- bufferPoolCfg.setFraction(cfg.getDBCachePercent() / 100.0f);
- memQuota.acquireMemory(memQuota.memPercentToBytes(cfg.getDBCachePercent()));
+ bufferPoolCfg.setFraction(config.getDBCachePercent() / 100.0f);
+ memQuota.acquireMemory(memQuota.memPercentToBytes(config.getDBCachePercent()));
}
- dbCfg.setCommitPolicy(cfg.isDBTxnNoSync() ? SOFT : GROUP);
- cfg.addPersistitChangeListener(this);
+ dbCfg.setCommitPolicy(config.isDBTxnNoSync() ? SOFT : GROUP);
+ return dbCfg;
}
/** {@inheritDoc} */
@@ -594,21 +631,31 @@
diskMonitor.deregisterMonitoredDirectory(getDirectory(), this);
}
- private BufferPoolConfiguration getBufferPoolCfg()
+ private static BufferPoolConfiguration getBufferPoolCfg(Configuration dbCfg)
{
return dbCfg.getBufferPoolMap().get(BUFFER_SIZE);
}
/** {@inheritDoc} */
@Override
- public void open() throws Exception
+ public void open() throws ConfigException, StorageRuntimeException
+ {
+ open0(buildConfiguration());
+ }
+
+ private void open0(final Configuration dbCfg) throws ConfigException
{
setupStorageFiles();
try
{
+ if (db != null)
+ {
+ throw new IllegalStateException(
+ "Database is already open, either the backend is enabled or an import is currently running.");
+ }
db = new Persistit(dbCfg);
- final long bufferCount = getBufferPoolCfg().computeBufferCount(db.getAvailableHeap());
+ final long bufferCount = getBufferPoolCfg(dbCfg).computeBufferCount(db.getAvailableHeap());
final long totalSize = bufferCount * BUFFER_SIZE / 1024;
logger.info(NOTE_PERSISTIT_MEMORY_CFG, config.getBackendId(),
bufferCount, BUFFER_SIZE, totalSize);
@@ -676,14 +723,13 @@
/** {@inheritDoc} */
@Override
- public Importer startImport() throws Exception
+ public Importer startImport() throws ConfigException, StorageRuntimeException
{
- clearAndCreateDbDir(backendDirectory);
- open();
+ open0(buildConfiguration());
return new ImporterImpl();
}
- private String mangleTreeName(final TreeName treeName)
+ private static String mangleTreeName(final TreeName treeName)
{
StringBuilder mangled = new StringBuilder();
String name = treeName.toString();
@@ -879,19 +925,19 @@
* TODO: it would be nice to use the low-level key/value APIs. They seem quite
* inefficient at the moment for simple byte arrays.
*/
- private Key bytesToKey(final Key key, final ByteSequence bytes)
+ private static Key bytesToKey(final Key key, final ByteSequence bytes)
{
final byte[] tmp = bytes.toByteArray();
return key.clear().appendByteArray(tmp, 0, tmp.length);
}
- private Value bytesToValue(final Value value, final ByteSequence bytes)
+ private static Value bytesToValue(final Value value, final ByteSequence bytes)
{
value.clear().putByteArray(bytes.toByteArray());
return value;
}
- private ByteString valueToBytes(final Value value)
+ private static ByteString valueToBytes(final Value value)
{
if (value.isDefined())
{
--
Gitblit v1.10.0