opendj-server-legacy/pom.xml
@@ -193,17 +193,18 @@ <artifactId>handler-jdbc</artifactId> </dependency> <!-- slf4j libraries --> <!-- <dependency> --> <!-- <groupId>org.slf4j</groupId> --> <!-- <artifactId>slf4j-jdk14</artifactId> --> <!-- </dependency> --> <dependency> <groupId>org.slf4j</groupId> <artifactId>jul-to-slf4j</artifactId> </dependency> <!-- Source: https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine --> <dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId>caffeine</artifactId> <version>3.2.3</version> <scope>compile</scope> </dependency> <!-- mail --> <dependency> opendj-server-legacy/src/main/java/org/opends/server/backends/cassandra/Storage.java
@@ -62,19 +62,17 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.cql.Statement; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; public class Storage implements org.opends.server.backends.pluggable.spi.Storage, ConfigurationChangeListener<CASBackendCfg>{ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); //private final ServerContext serverContext; private CASBackendCfg config; public Storage(CASBackendCfg cfg, ServerContext serverContext) { //this.serverContext = serverContext; this.config = cfg; cfg.addCASChangeListener(this); } @@ -101,15 +99,10 @@ CqlSession session=null; final LoadingCache<String,PreparedStatement> prepared=CacheBuilder.newBuilder() final LoadingCache<String,PreparedStatement> prepared = Caffeine.newBuilder() .expireAfterAccess(Duration.ofMinutes(10)) .maximumSize(4096) .build(new CacheLoader<String,PreparedStatement>(){ @Override public PreparedStatement load(String query) throws Exception { return session.prepare(query); } }); .build(query -> session.prepare(query)); ResultSet execute(Statement<?> statement) { if (logger.isTraceEnabled()) { @@ -133,7 +126,7 @@ .withConfigLoader(DriverConfigLoader.fromDefaults(Storage.class.getClassLoader())) .build(); if (AccessMode.READ_WRITE.equals(accessMode)) { execute(prepared.getUnchecked("CREATE KEYSPACE IF NOT EXISTS "+getKeyspaceName()+" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};").bind().setExecutionProfileName(profile)); execute(prepared.get("CREATE KEYSPACE IF NOT EXISTS "+getKeyspaceName()+" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};").bind().setExecutionProfileName(profile)); } storageStatus = StorageStatus.working(); } @@ -163,7 +156,7 @@ @Override public void removeStorageFiles() throws StorageRuntimeException { final Boolean isOpen=getStorageStatus().isWorking(); final boolean isOpen=getStorageStatus().isWorking(); if (!isOpen) { try { open(AccessMode.READ_WRITE); @@ -172,7 +165,7 @@ } } try { execute(prepared.getUnchecked("TRUNCATE TABLE "+getTableName()+";").bind().setExecutionProfileName(profile)); execute(prepared.get("TRUNCATE TABLE "+getTableName()+";").bind().setExecutionProfileName(profile)); }catch (Throwable e) {} if (!isOpen) { close(); @@ -210,7 +203,7 @@ @Override public void openTree(TreeName name, boolean createOnDemand) { if (createOnDemand) { execute(prepared.getUnchecked("CREATE TABLE IF NOT EXISTS "+getTableName()+" (baseDN text,indexId text,key blob,value blob,PRIMARY KEY ((baseDN,indexId),key));").bind().setExecutionProfileName(profile)); execute(prepared.get("CREATE TABLE IF NOT EXISTS "+getTableName()+" (baseDN text,indexId text,key blob,value blob,PRIMARY KEY ((baseDN,indexId),key));").bind().setExecutionProfileName(profile)); } } @@ -222,7 +215,7 @@ @Override public ByteString read(TreeName treeName, ByteSequence key) { final Row row=execute( prepared.getUnchecked("SELECT value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind() prepared.get("SELECT value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind() .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) .setByteBuffer("key", ByteBuffer.wrap(key.toByteArray())) ).one(); @@ -237,7 +230,7 @@ @Override public long getRecordCount(TreeName treeName) { return execute( prepared.getUnchecked("SELECT count(*) FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind() prepared.get("SELECT count(*) FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind() .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) ).one().getLong(0); } @@ -247,7 +240,7 @@ checkReadOnly(); openTree(treeName,true); execute( prepared.getUnchecked("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind() prepared.get("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind() .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) ); } @@ -256,7 +249,7 @@ public void put(TreeName treeName, ByteSequence key, ByteSequence value) { checkReadOnly(); execute( prepared.getUnchecked("INSERT INTO "+getTableName()+" (baseDN,indexId,key,value) VALUES (:baseDN,:indexId,:key,:value)").bind() prepared.get("INSERT INTO "+getTableName()+" (baseDN,indexId,key,value) VALUES (:baseDN,:indexId,:key,:value)").bind() .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) .setByteBuffer("key", ByteBuffer.wrap(key.toByteArray())) .setByteBuffer("value",ByteBuffer.wrap(value.toByteArray())) @@ -285,7 +278,7 @@ public boolean delete(TreeName treeName, ByteSequence key) { checkReadOnly(); execute( prepared.getUnchecked("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind() prepared.get("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind() .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) .setByteBuffer("key", ByteBuffer.wrap(key.toByteArray())) ); @@ -316,7 +309,7 @@ ResultSet full(){ return execute( prepared.getUnchecked("SELECT key,value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId ORDER BY key").bind() prepared.get("SELECT key,value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId ORDER BY key").bind() .setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId()) ); } opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/CachedConnection.java
@@ -15,40 +15,33 @@ */ package org.opends.server.backends.jdbc; import com.google.common.cache.*; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.RemovalCause; import java.sql.*; import java.util.LinkedList; import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.Queue; import java.util.concurrent.*; public class CachedConnection implements Connection { final Connection parent; static LoadingCache<String, BlockingQueue<CachedConnection>> cached= CacheBuilder.newBuilder() .expireAfterAccess(Long.parseLong(System.getProperty("org.openidentityplatform.opendj.jdbc.ttl","15000")), TimeUnit.MILLISECONDS) .removalListener(new RemovalListener<String, BlockingQueue<CachedConnection>>() { @Override public void onRemoval(RemovalNotification<String, BlockingQueue<CachedConnection>> notification) { assert notification.getValue() != null; for (CachedConnection con: notification.getValue()) { static LoadingCache<String, BlockingQueue<CachedConnection>> cached = Caffeine.newBuilder() .expireAfterAccess(Duration.ofMillis(Long.parseLong(System.getProperty("org.openidentityplatform.opendj.jdbc.ttl","15000")))) .removalListener((String key, BlockingQueue<CachedConnection> value, RemovalCause cause) -> { for (CachedConnection con : value) { try { if (!con.isClosed()) { con.parent.close(); } } catch (SQLException e) { } // ignore } } }) .build(new CacheLoader<String, BlockingQueue<CachedConnection>>() { @Override public BlockingQueue<CachedConnection> load(String connectionString) throws Exception { return new LinkedBlockingQueue<>(); } }); .build(conStr -> new LinkedBlockingQueue<>()); final String connectionString; public CachedConnection(String connectionString,Connection parent) { @@ -62,6 +55,7 @@ static Connection getConnection(String connectionString, final int waitTime) throws Exception { CachedConnection con=cached.get(connectionString).poll(waitTime,TimeUnit.MILLISECONDS); while(con!=null) { if (!con.isValid(0)) { try { @@ -127,11 +121,7 @@ @Override public void close() throws SQLException { rollback(); try { cached.get(connectionString).add(this); } catch (ExecutionException e) { throw new RuntimeException(e); } } @Override opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/Storage.java
@@ -15,9 +15,9 @@ */ package org.opends.server.backends.jdbc; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.config.server.ConfigChangeResult; @@ -36,9 +36,9 @@ import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.sql.*; import java.util.*; import java.util.concurrent.ExecutionException; import static org.opends.server.backends.pluggable.spi.StorageUtils.addErrorMessage; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; @@ -113,10 +113,9 @@ storageStatus = StorageStatus.lockedDown(LocalizableMessage.raw("closed")); } final LoadingCache<TreeName,String> tree2table= CacheBuilder.newBuilder() .build(new CacheLoader<TreeName, String>() { @Override public String load(TreeName treeName) throws Exception { final LoadingCache<TreeName,String> tree2table = Caffeine.newBuilder() .build(treeName -> { try { final MessageDigest md = MessageDigest.getInstance("SHA-224"); final byte[] messageDigest = md.digest(treeName.toString().getBytes()); final StringBuilder hashtext = new StringBuilder(56); @@ -126,15 +125,13 @@ hashtext.append(hex); } return "opendj_"+hashtext; } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } }); String getTableName(TreeName treeName) { try { return tree2table.get(treeName); } catch (ExecutionException e) { throw new RuntimeException(e); } } @Override @@ -195,7 +192,7 @@ } } final static byte[] NULL=new byte[]{(byte)0}; static final byte[] NULL=new byte[]{(byte)0}; static byte[] real2db(byte[] real) { return real.length==0?NULL:real; @@ -204,11 +201,10 @@ return Arrays.equals(NULL,db)?new byte[0]:db; } final LoadingCache<ByteBuffer,String> key2hash= CacheBuilder.newBuilder() final LoadingCache<ByteBuffer,String> key2hash = Caffeine.newBuilder() .softValues() .build(new CacheLoader<ByteBuffer, String>() { @Override public String load(ByteBuffer key) throws Exception { .build(key -> { try { final MessageDigest md = MessageDigest.getInstance("SHA-512"); final byte[] messageDigest = md.digest(key.array()); final StringBuilder hashtext = new StringBuilder(128); @@ -218,8 +214,11 @@ hashtext.append(hex); } return hashtext.toString(); } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } }); private class ReadableTransactionImpl implements ReadableTransaction { final Connection con; boolean isReadOnly=true; @@ -236,7 +235,7 @@ try(ResultSet rc=executeResultSet(statement)) { return rc.next() ? ByteString.wrap(rc.getBytes("v")) : null; } }catch (SQLException|ExecutionException e) { }catch (SQLException e) { throw new StorageRuntimeException(e); } } @@ -327,12 +326,12 @@ public void put(TreeName treeName, ByteSequence key, ByteSequence value) { try { upsert(treeName, key, value); } catch (SQLException|ExecutionException e) { } catch (SQLException e) { throw new RuntimeException(e); } } boolean upsert(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException { boolean upsert(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException { final String driverName=((CachedConnection) con).parent.getClass().getName(); if (driverName.contains("postgres")) { //postgres upsert try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) values (?,?,?) ON CONFLICT (h, k) DO UPDATE set v=excluded.v")) { @@ -367,7 +366,7 @@ } } boolean insert(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException { boolean insert(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException { try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) select ?,?,? where not exists (select 1 from "+getTableName(treeName)+" where h=? and k=? )")) { statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(2, real2db(key.toByteArray())); @@ -378,7 +377,7 @@ } } boolean update(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException { boolean update(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException { try (final PreparedStatement statement=con.prepareStatement("update "+getTableName(treeName)+" set v=? where h=? and k=?")){ statement.setBytes(1,value.toByteArray()); statement.setString(2,key2hash.get(ByteBuffer.wrap(key.toByteArray()))); @@ -409,7 +408,7 @@ statement.setString(1,key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(2,real2db(key.toByteArray())); return (execute(statement)==1 && statement.getUpdateCount()>0); }catch (SQLException|ExecutionException e) { }catch (SQLException e) { throw new StorageRuntimeException(e); } }