| | |
| | | |
| | | import static com.persistit.Transaction.CommitPolicy.*; |
| | | import static java.util.Arrays.*; |
| | | |
| | | import static org.opends.messages.BackendMessages.*; |
| | | import static org.opends.messages.ConfigMessages.*; |
| | | import static org.opends.messages.UtilityMessages.*; |
| | |
| | | import org.opends.server.backends.pluggable.spi.Importer; |
| | | import org.opends.server.backends.pluggable.spi.ReadOnlyStorageException; |
| | | import org.opends.server.backends.pluggable.spi.ReadOperation; |
| | | import org.opends.server.backends.pluggable.spi.SequentialCursor; |
| | | import org.opends.server.backends.pluggable.spi.Storage; |
| | | import org.opends.server.backends.pluggable.spi.StorageInUseException; |
| | | import org.opends.server.backends.pluggable.spi.StorageRuntimeException; |
| | |
| | | import com.persistit.Key; |
| | | import com.persistit.Persistit; |
| | | import com.persistit.Transaction; |
| | | import com.persistit.Tree; |
| | | import com.persistit.Value; |
| | | import com.persistit.Volume; |
| | | import com.persistit.VolumeSpecification; |
| | |
| | | import com.persistit.exception.PersistitException; |
| | | import com.persistit.exception.RollbackException; |
| | | import com.persistit.exception.TreeNotFoundException; |
| | | import com.persistit.mxbeans.CheckpointManagerMXBean; |
| | | |
| | | /** PersistIt database implementation of the {@link Storage} engine. */ |
| | | @SuppressWarnings("javadoc") |
| | | public final class PDBStorage implements Storage, Backupable, ConfigurationChangeListener<PDBBackendCfg>, |
| | | DiskSpaceMonitorHandler |
| | | { |
| | | private static final int IMPORT_DB_CACHE_SIZE = 4 * MB; |
| | | |
| | | private static final double MAX_SLEEP_ON_RETRY_MS = 50.0; |
| | | |
| | | private static final String VOLUME_NAME = "dj"; |
| | |
| | | /** PersistIt implementation of the {@link Importer} interface. */ |
| | | private final class ImporterImpl implements Importer |
| | | { |
| | | private final Map<TreeName, Tree> trees = new HashMap<>(); |
| | | private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>(); |
| | | private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>() |
| | | { |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void createTree(final TreeName treeName) |
| | | public void clearTree(final TreeName treeName) |
| | | { |
| | | final Transaction txn = db.getTransaction(); |
| | | deleteTree(txn, treeName); |
| | | createTree(txn, treeName); |
| | | } |
| | | |
| | | private void createTree(final Transaction txn, final TreeName treeName) |
| | | { |
| | | Exchange ex = null; |
| | | try |
| | | { |
| | | final Tree tree = volume.getTree(mangleTreeName(treeName), true); |
| | | trees.put(treeName, tree); |
| | | txn.begin(); |
| | | ex = getNewExchange(treeName, true); |
| | | txn.commit(); |
| | | } |
| | | catch (final PersistitException e) |
| | | catch (PersistitException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | finally |
| | | { |
| | | txn.end(); |
| | | releaseExchangeSilenty(ex); |
| | | } |
| | | } |
| | | |
| | | private void deleteTree(Transaction txn, final TreeName treeName) |
| | | { |
| | | Exchange ex = null; |
| | | try |
| | | { |
| | | txn.begin(); |
| | | ex = getNewExchange(treeName, true); |
| | | ex.removeTree(); |
| | | txn.commit(); |
| | | } |
| | | catch (PersistitException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | finally |
| | | { |
| | | txn.end(); |
| | | releaseExchangeSilenty(ex); |
| | | } |
| | | } |
| | | |
| | | private void releaseExchangeSilenty(Exchange ex) |
| | | { |
| | | if ( ex != null) |
| | | { |
| | | db.releaseExchange(ex); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | |
| | | } |
| | | |
| | | @Override |
| | | public boolean delete(final TreeName treeName, final ByteSequence key) |
| | | { |
| | | try |
| | | { |
| | | final Exchange ex = getExchangeFromCache(treeName); |
| | | bytesToKey(ex.getKey(), key); |
| | | return ex.remove(); |
| | | } |
| | | catch (final PersistitException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public ByteString read(final TreeName treeName, final ByteSequence key) |
| | | { |
| | | try |
| | |
| | | } |
| | | return exchange; |
| | | } |
| | | |
| | | @Override |
| | | public SequentialCursor<ByteString, ByteString> openCursor(TreeName treeName) |
| | | { |
| | | try |
| | | { |
| | | return new CursorImpl(getNewExchange(treeName, false)); |
| | | } |
| | | catch (PersistitException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** Common interface for internal WriteableTransaction implementations. */ |
| | |
| | | public long getRecordCount(TreeName treeName) |
| | | { |
| | | // FIXME: is there a better/quicker way to do this? |
| | | final Cursor<?, ?> cursor = openCursor(treeName); |
| | | try |
| | | try(final Cursor<?, ?> cursor = openCursor(treeName)) |
| | | { |
| | | long count = 0; |
| | | while (cursor.next()) |
| | |
| | | } |
| | | return count; |
| | | } |
| | | finally |
| | | { |
| | | cursor.close(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void renameTree(final TreeName oldTreeName, final TreeName newTreeName) |
| | | { |
| | | throw new UnsupportedOperationException(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean update(final TreeName treeName, final ByteSequence key, final UpdateFunction f) |
| | | { |
| | | try |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void renameTree(TreeName oldName, TreeName newName) |
| | | { |
| | | throw new ReadOnlyStorageException(); |
| | | } |
| | | |
| | | @Override |
| | | public void deleteTree(TreeName name) |
| | | { |
| | | throw new ReadOnlyStorageException(); |
| | |
| | | cfg.addPDBChangeListener(this); |
| | | } |
| | | |
| | | private Configuration buildImportConfiguration() |
| | | { |
| | | final Configuration dbCfg = buildConfiguration(AccessMode.READ_WRITE); |
| | | getBufferPoolCfg(dbCfg).setMaximumMemory(IMPORT_DB_CACHE_SIZE); |
| | | dbCfg.setCheckpointInterval(CheckpointManagerMXBean.MAXIMUM_CHECKPOINT_INTERVAL_S); |
| | | dbCfg.setCommitPolicy(SOFT); |
| | | return dbCfg; |
| | | } |
| | | |
| | | private Configuration buildConfiguration(AccessMode accessMode) |
| | | { |
| | | this.accessMode = accessMode; |
| | |
| | | @Override |
| | | public Importer startImport() throws ConfigException, StorageRuntimeException |
| | | { |
| | | open0(buildConfiguration(AccessMode.READ_WRITE)); |
| | | open0(buildImportConfiguration()); |
| | | return new ImporterImpl(); |
| | | } |
| | | |