From 641e89ef0e15c9edde69f3b8cf82c7dd5f68687a Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 30 Sep 2015 14:28:07 +0000
Subject: [PATCH] OPENDJ-2016: New on disk merge import strategy based on storage engine.
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java | 116 +++++++++++++++++++++++++++++++++++++--------------------
1 files changed, 75 insertions(+), 41 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
index 61ab41f..ddf46d8 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
@@ -27,7 +27,6 @@
import static com.persistit.Transaction.CommitPolicy.*;
import static java.util.Arrays.*;
-
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.UtilityMessages.*;
@@ -69,6 +68,7 @@
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadOnlyStorageException;
import org.opends.server.backends.pluggable.spi.ReadOperation;
+import org.opends.server.backends.pluggable.spi.SequentialCursor;
import org.opends.server.backends.pluggable.spi.Storage;
import org.opends.server.backends.pluggable.spi.StorageInUseException;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
@@ -94,7 +94,6 @@
import com.persistit.Key;
import com.persistit.Persistit;
import com.persistit.Transaction;
-import com.persistit.Tree;
import com.persistit.Value;
import com.persistit.Volume;
import com.persistit.VolumeSpecification;
@@ -102,12 +101,15 @@
import com.persistit.exception.PersistitException;
import com.persistit.exception.RollbackException;
import com.persistit.exception.TreeNotFoundException;
+import com.persistit.mxbeans.CheckpointManagerMXBean;
/** PersistIt database implementation of the {@link Storage} engine. */
@SuppressWarnings("javadoc")
public final class PDBStorage implements Storage, Backupable, ConfigurationChangeListener<PDBBackendCfg>,
DiskSpaceMonitorHandler
{
+ private static final int IMPORT_DB_CACHE_SIZE = 4 * MB;
+
private static final double MAX_SLEEP_ON_RETRY_MS = 50.0;
private static final String VOLUME_NAME = "dj";
@@ -263,7 +265,6 @@
/** PersistIt implementation of the {@link Importer} interface. */
private final class ImporterImpl implements Importer
{
- private final Map<TreeName, Tree> trees = new HashMap<>();
private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>();
private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>()
{
@@ -291,17 +292,60 @@
}
@Override
- public void createTree(final TreeName treeName)
+ public void clearTree(final TreeName treeName)
{
+ final Transaction txn = db.getTransaction();
+ deleteTree(txn, treeName);
+ createTree(txn, treeName);
+ }
+
+ private void createTree(final Transaction txn, final TreeName treeName)
+ {
+ Exchange ex = null;
try
{
- final Tree tree = volume.getTree(mangleTreeName(treeName), true);
- trees.put(treeName, tree);
+ txn.begin();
+ ex = getNewExchange(treeName, true);
+ txn.commit();
}
- catch (final PersistitException e)
+ catch (PersistitException e)
{
throw new StorageRuntimeException(e);
}
+ finally
+ {
+ txn.end();
+ releaseExchangeSilenty(ex);
+ }
+ }
+
+ private void deleteTree(Transaction txn, final TreeName treeName)
+ {
+ Exchange ex = null;
+ try
+ {
+ txn.begin();
+ ex = getNewExchange(treeName, true);
+ ex.removeTree();
+ txn.commit();
+ }
+ catch (PersistitException e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ finally
+ {
+ txn.end();
+ releaseExchangeSilenty(ex);
+ }
+ }
+
+ private void releaseExchangeSilenty(Exchange ex)
+ {
+ if ( ex != null)
+ {
+ db.releaseExchange(ex);
+ }
}
@Override
@@ -321,21 +365,6 @@
}
@Override
- public boolean delete(final TreeName treeName, final ByteSequence key)
- {
- try
- {
- final Exchange ex = getExchangeFromCache(treeName);
- bytesToKey(ex.getKey(), key);
- return ex.remove();
- }
- catch (final PersistitException e)
- {
- throw new StorageRuntimeException(e);
- }
- }
-
- @Override
public ByteString read(final TreeName treeName, final ByteSequence key)
{
try
@@ -362,6 +391,19 @@
}
return exchange;
}
+
+ @Override
+ public SequentialCursor<ByteString, ByteString> openCursor(TreeName treeName)
+ {
+ try
+ {
+ return new CursorImpl(getNewExchange(treeName, false));
+ }
+ catch (PersistitException e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ }
}
/** Common interface for internal WriteableTransaction implementations. */
@@ -430,8 +472,7 @@
public long getRecordCount(TreeName treeName)
{
// FIXME: is there a better/quicker way to do this?
- final Cursor<?, ?> cursor = openCursor(treeName);
- try
+ try(final Cursor<?, ?> cursor = openCursor(treeName))
{
long count = 0;
while (cursor.next())
@@ -440,10 +481,6 @@
}
return count;
}
- finally
- {
- cursor.close();
- }
}
@Override
@@ -501,12 +538,6 @@
}
@Override
- public void renameTree(final TreeName oldTreeName, final TreeName newTreeName)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public boolean update(final TreeName treeName, final ByteSequence key, final UpdateFunction f)
{
try
@@ -642,12 +673,6 @@
}
@Override
- public void renameTree(TreeName oldName, TreeName newName)
- {
- throw new ReadOnlyStorageException();
- }
-
- @Override
public void deleteTree(TreeName name)
{
throw new ReadOnlyStorageException();
@@ -712,6 +737,15 @@
cfg.addPDBChangeListener(this);
}
+ private Configuration buildImportConfiguration()
+ {
+ final Configuration dbCfg = buildConfiguration(AccessMode.READ_WRITE);
+ getBufferPoolCfg(dbCfg).setMaximumMemory(IMPORT_DB_CACHE_SIZE);
+ dbCfg.setCheckpointInterval(CheckpointManagerMXBean.MAXIMUM_CHECKPOINT_INTERVAL_S);
+ dbCfg.setCommitPolicy(SOFT);
+ return dbCfg;
+ }
+
private Configuration buildConfiguration(AccessMode accessMode)
{
this.accessMode = accessMode;
@@ -867,7 +901,7 @@
@Override
public Importer startImport() throws ConfigException, StorageRuntimeException
{
- open0(buildConfiguration(AccessMode.READ_WRITE));
+ open0(buildImportConfiguration());
return new ImporterImpl();
}
--
Gitblit v1.10.0