| | |
| | | import com.datastax.oss.driver.api.core.cql.ResultSet; |
| | | import com.datastax.oss.driver.api.core.cql.Row; |
| | | import com.datastax.oss.driver.api.core.cql.Statement; |
| | | import com.google.common.cache.CacheBuilder; |
| | | import com.google.common.cache.CacheLoader; |
| | | import com.google.common.cache.LoadingCache; |
| | | |
| | | import com.github.benmanes.caffeine.cache.Caffeine; |
| | | import com.github.benmanes.caffeine.cache.LoadingCache; |
| | | |
| | | public class Storage implements org.opends.server.backends.pluggable.spi.Storage, ConfigurationChangeListener<CASBackendCfg>{ |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | //private final ServerContext serverContext; |
| | | private CASBackendCfg config; |
| | | |
| | | |
| | | public Storage(CASBackendCfg cfg, ServerContext serverContext) { |
| | | //this.serverContext = serverContext; |
| | | this.config = cfg; |
| | | cfg.addCASChangeListener(this); |
| | | cfg.addCASChangeListener(this); |
| | | } |
| | | |
| | | //config |
| | |
| | | @Override |
| | | public ConfigChangeResult applyConfigurationChange(CASBackendCfg cfg) { |
| | | final ConfigChangeResult ccr = new ConfigChangeResult(); |
| | | try |
| | | { |
| | | this.config = cfg; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | addErrorMessage(ccr, LocalizableMessage.raw(stackTraceToSingleLineString(e))); |
| | | } |
| | | return ccr; |
| | | try |
| | | { |
| | | this.config = cfg; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | addErrorMessage(ccr, LocalizableMessage.raw(stackTraceToSingleLineString(e))); |
| | | } |
| | | return ccr; |
| | | } |
| | | |
| | | CqlSession session=null; |
| | | |
| | | final LoadingCache<String,PreparedStatement> prepared=CacheBuilder.newBuilder() |
| | | .expireAfterAccess(Duration.ofMinutes(10)) |
| | | .maximumSize(4096) |
| | | .build(new CacheLoader<String,PreparedStatement>(){ |
| | | @Override |
| | | public PreparedStatement load(String query) throws Exception { |
| | | return session.prepare(query); |
| | | } |
| | | }); |
| | | |
| | | |
| | | final LoadingCache<String,PreparedStatement> prepared = Caffeine.newBuilder() |
| | | .expireAfterAccess(Duration.ofMinutes(10)) |
| | | .maximumSize(4096) |
| | | .build(query -> session.prepare(query)); |
| | | |
| | | ResultSet execute(Statement<?> statement) { |
| | | if (logger.isTraceEnabled()) { |
| | | final ResultSet res=session.execute(statement.setTracing(true)); |
| | | logger.trace(LocalizableMessage.raw( |
| | | "cassandra: %s" |
| | | ,res.getExecutionInfo().getQueryTrace().getParameters() |
| | | ) |
| | | ); |
| | | ) |
| | | ); |
| | | return res; |
| | | } |
| | | return session.execute(statement); |
| | |
| | | public void open(AccessMode accessMode) throws Exception { |
| | | this.accessMode=accessMode; |
| | | session=CqlSession.builder() |
| | | .withApplicationName("OpenDJ "+getKeyspaceName()+"."+config.getBackendId()) |
| | | .withConfigLoader(DriverConfigLoader.fromDefaults(Storage.class.getClassLoader())) |
| | | .build(); |
| | | .withApplicationName("OpenDJ "+getKeyspaceName()+"."+config.getBackendId()) |
| | | .withConfigLoader(DriverConfigLoader.fromDefaults(Storage.class.getClassLoader())) |
| | | .build(); |
| | | if (AccessMode.READ_WRITE.equals(accessMode)) { |
| | | execute(prepared.getUnchecked("CREATE KEYSPACE IF NOT EXISTS "+getKeyspaceName()+" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};").bind().setExecutionProfileName(profile)); |
| | | execute(prepared.get("CREATE KEYSPACE IF NOT EXISTS "+getKeyspaceName()+" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};").bind().setExecutionProfileName(profile)); |
| | | } |
| | | storageStatus = StorageStatus.working(); |
| | | } |
| | |
| | | |
| | | @Override |
| | | public void removeStorageFiles() throws StorageRuntimeException { |
| | | final Boolean isOpen=getStorageStatus().isWorking(); |
| | | final boolean isOpen=getStorageStatus().isWorking(); |
| | | if (!isOpen) { |
| | | try { |
| | | open(AccessMode.READ_WRITE); |
| | |
| | | } |
| | | } |
| | | try { |
| | | execute(prepared.getUnchecked("TRUNCATE TABLE "+getTableName()+";").bind().setExecutionProfileName(profile)); |
| | | execute(prepared.get("TRUNCATE TABLE "+getTableName()+";").bind().setExecutionProfileName(profile)); |
| | | }catch (Throwable e) {} |
| | | if (!isOpen) { |
| | | close(); |
| | |
| | | @Override |
| | | public void openTree(TreeName name, boolean createOnDemand) { |
| | | if (createOnDemand) { |
| | | execute(prepared.getUnchecked("CREATE TABLE IF NOT EXISTS "+getTableName()+" (baseDN text,indexId text,key blob,value blob,PRIMARY KEY ((baseDN,indexId),key));").bind().setExecutionProfileName(profile)); |
| | | execute(prepared.get("CREATE TABLE IF NOT EXISTS "+getTableName()+" (baseDN text,indexId text,key blob,value blob,PRIMARY KEY ((baseDN,indexId),key));").bind().setExecutionProfileName(profile)); |
| | | } |
| | | } |
| | | |
| | |
| | | @Override |
| | | public ByteString read(TreeName treeName, ByteSequence key) { |
| | | final Row row=execute( |
| | | prepared.getUnchecked("SELECT value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | .setByteBuffer("key", ByteBuffer.wrap(key.toByteArray())) |
| | | ).one(); |
| | | prepared.get("SELECT value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | .setByteBuffer("key", ByteBuffer.wrap(key.toByteArray())) |
| | | ).one(); |
| | | return row==null?null:ByteString.wrap(row.getByteBuffer("value").array()); |
| | | } |
| | | |
| | |
| | | @Override |
| | | public long getRecordCount(TreeName treeName) { |
| | | return execute( |
| | | prepared.getUnchecked("SELECT count(*) FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | ).one().getLong(0); |
| | | prepared.get("SELECT count(*) FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | ).one().getLong(0); |
| | | } |
| | | |
| | | @Override |
| | |
| | | checkReadOnly(); |
| | | openTree(treeName,true); |
| | | execute( |
| | | prepared.getUnchecked("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | ); |
| | | prepared.get("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | ); |
| | | } |
| | | |
| | | @Override |
| | | public void put(TreeName treeName, ByteSequence key, ByteSequence value) { |
| | | checkReadOnly(); |
| | | execute( |
| | | prepared.getUnchecked("INSERT INTO "+getTableName()+" (baseDN,indexId,key,value) VALUES (:baseDN,:indexId,:key,:value)").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | prepared.get("INSERT INTO "+getTableName()+" (baseDN,indexId,key,value) VALUES (:baseDN,:indexId,:key,:value)").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | .setByteBuffer("key", ByteBuffer.wrap(key.toByteArray())) |
| | | .setByteBuffer("value",ByteBuffer.wrap(value.toByteArray())) |
| | | ); |
| | | ); |
| | | } |
| | | |
| | | @Override |
| | |
| | | final ByteString oldValue=read(treeName,key); |
| | | final ByteSequence newValue=f.computeNewValue(oldValue); |
| | | if (Objects.equals(newValue, oldValue)) |
| | | { |
| | | { |
| | | return false; |
| | | } |
| | | if (newValue == null) |
| | | { |
| | | delete(treeName, key); |
| | | return true; |
| | | } |
| | | put(treeName,key,newValue); |
| | | } |
| | | if (newValue == null) |
| | | { |
| | | delete(treeName, key); |
| | | return true; |
| | | } |
| | | put(treeName,key,newValue); |
| | | return true; |
| | | } |
| | | |
| | |
| | | public boolean delete(TreeName treeName, ByteSequence key) { |
| | | checkReadOnly(); |
| | | execute( |
| | | prepared.getUnchecked("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | .setByteBuffer("key", ByteBuffer.wrap(key.toByteArray())) |
| | | ); |
| | | prepared.get("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | .setByteBuffer("key", ByteBuffer.wrap(key.toByteArray())) |
| | | ); |
| | | return true; |
| | | } |
| | | |
| | |
| | | |
| | | ResultSet full(){ |
| | | return execute( |
| | | prepared.getUnchecked("SELECT key,value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId ORDER BY key").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | ); |
| | | prepared.get("SELECT key,value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId ORDER BY key").bind() |
| | | .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) |
| | | ); |
| | | } |
| | | |
| | | @Override |
| | |
| | | return true; |
| | | } |
| | | ct++; |
| | | } |
| | | } |
| | | current=null; |
| | | return false; |
| | | } |