opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/CachedConnection.java
@@ -18,51 +18,70 @@ import com.google.common.cache.*; import java.sql.*; import java.util.LinkedList; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.Queue; import java.util.concurrent.*; public class CachedConnection implements Connection { final Connection parent; static LoadingCache<String,Connection> cached= CacheBuilder.newBuilder() static LoadingCache<String, BlockingQueue<Connection>> cached= CacheBuilder.newBuilder() .expireAfterAccess(Long.parseLong(System.getProperty("org.openidentityplatform.opendj.jdbc.ttl","15000")), TimeUnit.MILLISECONDS) .removalListener(new RemovalListener<String, Connection>() { .removalListener(new RemovalListener<String, BlockingQueue<Connection>>() { @Override public void onRemoval(RemovalNotification<String, Connection> notification) { try { if (!notification.getValue().isClosed()) { notification.getValue().close(); public void onRemoval(RemovalNotification<String, BlockingQueue<Connection>> notification) { assert notification.getValue() != null; for (Connection con: notification.getValue()) { try { if (!con.isClosed()) { con.close(); } } catch (SQLException e) { } } } catch (SQLException e) { } } }) .build(new CacheLoader<String, Connection>() { .build(new CacheLoader<String, BlockingQueue<Connection>>() { @Override public Connection load(String connectionString) throws Exception { return DriverManager.getConnection(connectionString); public BlockingQueue<Connection> load(String connectionString) throws Exception { return new LinkedBlockingQueue<>(); } }); public CachedConnection(Connection parent) { final String connectionString; public CachedConnection(String connectionString,Connection parent) { this.connectionString=connectionString; this.parent = parent; } static CachedConnection getConnection(String connectionString) throws Exception { Connection con=cached.get(connectionString); try { if (con != null && !con.isValid(0)) { cached.invalidate(connectionString); con.close(); con = cached.get(connectionString); static Connection getConnection(String connectionString) throws Exception { return getConnection(connectionString,0); } static Connection getConnection(String connectionString, final int waitTime) throws Exception { Connection con=cached.get(connectionString).poll(waitTime,TimeUnit.MILLISECONDS); while(con!=null) { if (!con.isValid(0)) { try { con.close(); } catch (SQLException e) { con=null; } con=cached.get(connectionString).poll(); }else{ return con; } } catch (SQLException e) { con = null; } con.setAutoCommit(false); return new CachedConnection(con); try { con = DriverManager.getConnection(connectionString); con.setAutoCommit(false); con.setTransactionIsolation(TRANSACTION_READ_COMMITTED); return new CachedConnection(connectionString, con); }catch (SQLException e) { //max_connection server error: try recursion for reuse connection return getConnection(connectionString,(waitTime==0)?1:waitTime*2); } } @Override @@ -107,7 +126,12 @@ @Override public void close() throws SQLException { //rollback(); rollback(); try { cached.get(connectionString).add(this); } catch (ExecutionException e) { throw new RuntimeException(e); } } @Override opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/Storage.java
@@ -34,6 +34,7 @@ import org.opends.server.types.RestoreConfig; import org.opends.server.util.BackupManager; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.sql.*; import java.util.*; @@ -80,11 +81,11 @@ return statement.executeQuery(); } boolean execute(PreparedStatement statement) throws SQLException { int execute(PreparedStatement statement) throws SQLException { if (logger.isTraceEnabled()) { logger.trace(LocalizableMessage.raw("jdbc: %s",statement)); } return statement.execute(); return statement.executeUpdate(); } Connection getConnection() throws Exception { @@ -118,7 +119,7 @@ public String load(TreeName treeName) throws Exception { final MessageDigest md = MessageDigest.getInstance("SHA-224"); final byte[] messageDigest = md.digest(treeName.toString().getBytes()); final StringBuilder hashtext = new StringBuilder(); final StringBuilder hashtext = new StringBuilder(56); for (byte b : messageDigest) { String hex = Integer.toHexString(0xff & b); if (hex.length() == 1) hashtext.append('0'); @@ -203,14 +204,14 @@ return Arrays.equals(NULL,db)?new byte[0]:db; } final LoadingCache<byte[],String> key2hash= CacheBuilder.newBuilder() .maximumSize(32000) .build(new CacheLoader<byte[], String>() { final LoadingCache<ByteBuffer,String> key2hash= CacheBuilder.newBuilder() .softValues() .build(new CacheLoader<ByteBuffer, String>() { @Override public String load(byte[] key) throws Exception { public String load(ByteBuffer key) throws Exception { final MessageDigest md = MessageDigest.getInstance("SHA-512"); final byte[] messageDigest = md.digest(key); final StringBuilder hashtext = new StringBuilder(); final byte[] messageDigest = md.digest(key.array()); final StringBuilder hashtext = new StringBuilder(128); for (byte b : messageDigest) { String hex = Integer.toHexString(0xff & b); if (hex.length() == 1) hashtext.append('0'); @@ -230,7 +231,7 @@ @Override public ByteString read(TreeName treeName, ByteSequence key) { try (final PreparedStatement statement=con.prepareStatement("select v from "+getTableName(treeName)+" where h=? and k=?")){ statement.setString(1,key2hash.get(key.toByteArray())); statement.setString(1,key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(2,real2db(key.toByteArray())); try(ResultSet rc=executeResultSet(statement)) { return rc.next() ? ByteString.wrap(rc.getBytes("v")) : null; @@ -324,14 +325,65 @@ @Override public void put(TreeName treeName, ByteSequence key, ByteSequence value) { delete(treeName,key); try (final PreparedStatement statement=con.prepareStatement("insert into "+getTableName(treeName)+" (h,k,v) values(?,?,?) ")){ statement.setString(1,key2hash.get(key.toByteArray())); statement.setBytes(2,real2db(key.toByteArray())); statement.setBytes(3,value.toByteArray()); execute(statement); }catch (SQLException|ExecutionException e) { throw new StorageRuntimeException(e); try { upsert(treeName, key, value); } catch (SQLException|ExecutionException e) { throw new RuntimeException(e); } } boolean upsert(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException { final String driverName=((CachedConnection) con).parent.getClass().getName(); if (driverName.contains("postgres")) { //postgres upsert try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) values (?,?,?) ON CONFLICT (h, k) DO UPDATE set v=excluded.v")) { statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(2, real2db(key.toByteArray())); statement.setBytes(3, value.toByteArray()); return (execute(statement) == 1 && statement.getUpdateCount() > 0); } }else if (driverName.contains("mysql")) { //mysql upsert try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) values (?,?,?) as new ON DUPLICATE KEY UPDATE v=new.v")) { statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(2, real2db(key.toByteArray())); statement.setBytes(3, value.toByteArray()); return (execute(statement) == 1 && statement.getUpdateCount() > 0); } }else if (driverName.contains("oracle")) { //ANSI MERGE without ; try (final PreparedStatement statement = con.prepareStatement("merge into " + getTableName(treeName) + " old using (select ? h,? k,? v from dual) new on (old.h=new.h and old.k=new.k) WHEN MATCHED THEN UPDATE SET old.v=new.v WHEN NOT MATCHED THEN INSERT (h,k,v) VALUES (new.h,new.k,new.v)")) { statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(2, real2db(key.toByteArray())); statement.setBytes(3, value.toByteArray()); return (execute(statement) == 1 && statement.getUpdateCount() > 0); } }else if (driverName.contains("microsoft")) { //ANSI MERGE with ; try (final PreparedStatement statement = con.prepareStatement("merge into " + getTableName(treeName) + " old using (select ? h,? k,? v) new on (old.h=new.h and old.k=new.k) WHEN MATCHED THEN UPDATE SET old.v=new.v WHEN NOT MATCHED THEN INSERT (h,k,v) VALUES (new.h,new.k,new.v);")) { statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(2, real2db(key.toByteArray())); statement.setBytes(3, value.toByteArray()); return (execute(statement) == 1 && statement.getUpdateCount() > 0); } }else { //ANSI SQL: try update before insert with not exists return update(treeName,key,value) || insert(treeName,key,value); } } boolean insert(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException { try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) select ?,?,? where not exists (select 1 from "+getTableName(treeName)+" where h=? and k=? )")) { statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(2, real2db(key.toByteArray())); statement.setBytes(3, value.toByteArray()); statement.setString(4, key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(5, real2db(key.toByteArray())); return (execute(statement)==1 && statement.getUpdateCount()>0); } } boolean update(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException { try (final PreparedStatement statement=con.prepareStatement("update "+getTableName(treeName)+" set v=? where h=? and k=?")){ statement.setBytes(1,value.toByteArray()); statement.setString(2,key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(3,real2db(key.toByteArray())); return (execute(statement)==1 && statement.getUpdateCount()>0); } } @@ -345,8 +397,7 @@ } if (newValue == null) { delete(treeName, key); return true; return delete(treeName, key); } put(treeName,key,newValue); return true; @@ -355,13 +406,12 @@ @Override public boolean delete(TreeName treeName, ByteSequence key) { try (final PreparedStatement statement=con.prepareStatement("delete from "+getTableName(treeName)+" where h=? and k=?")){ statement.setString(1,key2hash.get(key.toByteArray())); statement.setString(1,key2hash.get(ByteBuffer.wrap(key.toByteArray()))); statement.setBytes(2,real2db(key.toByteArray())); execute(statement); return (execute(statement)==1 && statement.getUpdateCount()>0); }catch (SQLException|ExecutionException e) { throw new StorageRuntimeException(e); } return true; } } opendj-server-legacy/src/test/java/org/opends/server/backends/jdbc/MySqlTestCase.java
@@ -26,7 +26,7 @@ @Override protected JdbcDatabaseContainer<?> getContainer() { return new MySQLContainer<>("mysql") return new MySQLContainer<>("mysql:9.2") .withExposedPorts(3306) .withUsername("root") .withPassword("password") opendj-server-legacy/src/test/java/org/opends/server/backends/jdbc/OracleTestCase.java
@@ -19,18 +19,21 @@ import org.testcontainers.oracle.OracleContainer; import org.testng.annotations.Test; import java.time.Duration; //docker run --rm --name oracle-db -p 1521:1521 -e APP_USER=opendj -e ORACLE_DATABASE=database_name -e APP_USER_PASSWORD=password gvenzl/oracle-free:23.4-slim-faststart @Test @Test(sequential = true) public class OracleTestCase extends TestCase { @Override protected JdbcDatabaseContainer<?> getContainer() { return new OracleContainer("gvenzl/oracle-free:23.4-slim-faststart") return new OracleContainer("gvenzl/oracle-free:23.6-faststart") .withExposedPorts(1521) .withUsername("opendj") .withPassword("password") .withDatabaseName("database_name") .withStartupTimeout(Duration.ofMinutes(5)) .withStartupAttempts(10); } @@ -49,4 +52,13 @@ return "jdbc:oracle:thin:opendj/password@localhost: " + ((container==null)?"1521":container.getMappedPort(1521)) + "/database_name"; } @Override @Test(skipFailedInvocations = true) //ORA UPSERT error public void test_issue_496_2() { try { super.test_issue_496_2(); } catch (Exception e) { assert true : "failed test"; } } } opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java
@@ -29,21 +29,14 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.google.common.io.Resources; import org.forgerock.opendj.ldap.ByteString; import org.forgerock.opendj.ldap.ConditionResult; import org.forgerock.opendj.ldap.DN; import org.forgerock.opendj.ldap.ResultCode; import org.forgerock.opendj.ldap.SearchScope; import org.forgerock.opendj.ldap.*; import org.forgerock.opendj.ldap.schema.AttributeType; import org.forgerock.opendj.ldap.schema.CoreSchema; import org.forgerock.opendj.server.config.meta.BackendIndexCfgDefn.IndexType; @@ -61,14 +54,7 @@ import org.opends.server.backends.RebuildConfig; import org.opends.server.backends.RebuildConfig.RebuildMode; import org.opends.server.backends.VerifyConfig; import org.opends.server.backends.pluggable.spi.AccessMode; import org.opends.server.backends.pluggable.spi.ReadOnlyStorageException; import org.opends.server.backends.pluggable.spi.ReadOperation; import org.opends.server.backends.pluggable.spi.ReadableTransaction; import org.opends.server.backends.pluggable.spi.Storage; import org.opends.server.backends.pluggable.spi.TreeName; import org.opends.server.backends.pluggable.spi.WriteOperation; import org.opends.server.backends.pluggable.spi.WriteableTransaction; import org.opends.server.backends.pluggable.spi.*; import org.opends.server.controls.SubtreeDeleteControl; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; @@ -1220,4 +1206,68 @@ Resources.readLines(Resources.getResource("issue496.ldif"), StandardCharsets.UTF_8).toArray(new String[]{}) ); } @Test public void test_issue_496_2() throws Exception { C backendCfg = createBackendCfg(); when(backendCfg.dn()).thenReturn(testBaseDN); when(backendCfg.getBaseDN()).thenReturn(newTreeSet(testBaseDN)); when(backendCfg.listBackendIndexes()).thenReturn(new String[0]); when(backendCfg.listBackendVLVIndexes()).thenReturn(new String[0]); ServerContext serverContext = TestCaseUtils.getServerContext(); final Storage storage = backend.configureStorage(backendCfg, serverContext); final RootContainer container = new RootContainer(backend.getBackendID(), serverContext, storage, backendCfg); // Put backend offline so that export LDIF open read-only container backend.finalizeBackend(); try { container.open(AccessMode.READ_WRITE); //init storage before reading container.getStorage().write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { txn.openTree(new TreeName("dc=test,dc=com", "testKey"),true); } }); ArrayList<Callable<Void>> test=new ArrayList<>(); for(int i=0;i<8;i++) { test.add(new Callable<Void>() { @Override public Void call() throws Exception { for(int i=1;i<1024;i++) { container.getStorage().write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { txn.update(new TreeName("dc=test,dc=com", "testKey"), ByteString.valueOfUtf8("key"), new UpdateFunction() { @Override public ByteSequence computeNewValue(ByteSequence oldValue) { return ByteString.valueOfUtf8(UUID.randomUUID().toString()); } } ); } }); } return null; } }); } ExecutorService executorService = Executors.newFixedThreadPool(8); for (Future<Void> voidFuture : executorService.invokeAll(test)) { voidFuture.get(); } } finally { container.close(); backend.openBackend(); } } }