mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Valery Kharseko
23.58.2025 e417cd1a6892d2810e8baf733bb09a3ae5a3c4c2
[#496] FIX JDBC storage update concurrency (#512)

5 files modified
282 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/CachedConnection.java 76 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/Storage.java 96 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/backends/jdbc/MySqlTestCase.java 2 ●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/backends/jdbc/OracleTestCase.java 16 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java 92 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/CachedConnection.java
@@ -18,51 +18,70 @@
import com.google.common.cache.*;
import java.sql.*;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.Queue;
import java.util.concurrent.*;
public class CachedConnection implements Connection {
    final Connection parent;
    static LoadingCache<String,Connection> cached= CacheBuilder.newBuilder()
    static LoadingCache<String, BlockingQueue<Connection>> cached= CacheBuilder.newBuilder()
            .expireAfterAccess(Long.parseLong(System.getProperty("org.openidentityplatform.opendj.jdbc.ttl","15000")), TimeUnit.MILLISECONDS)
            .removalListener(new RemovalListener<String, Connection>() {
            .removalListener(new RemovalListener<String, BlockingQueue<Connection>>() {
                @Override
                public void onRemoval(RemovalNotification<String, Connection> notification) {
                    try {
                        if (!notification.getValue().isClosed()) {
                            notification.getValue().close();
                public void onRemoval(RemovalNotification<String, BlockingQueue<Connection>> notification) {
                    assert notification.getValue() != null;
                    for (Connection con: notification.getValue()) {
                            try {
                                if (!con.isClosed()) {
                                    con.close();
                                }
                            } catch (SQLException e) {
                            }
                        }
                    } catch (SQLException e) {
                    }
                }
            })
            .build(new CacheLoader<String, Connection>() {
            .build(new CacheLoader<String, BlockingQueue<Connection>>() {
                @Override
                public Connection load(String connectionString) throws Exception {
                    return DriverManager.getConnection(connectionString);
                public BlockingQueue<Connection> load(String connectionString) throws Exception {
                    return new LinkedBlockingQueue<>();
                }
            });
    public CachedConnection(Connection parent) {
    final String connectionString;
    public CachedConnection(String connectionString,Connection parent) {
        this.connectionString=connectionString;
        this.parent = parent;
    }
    static CachedConnection getConnection(String connectionString) throws Exception {
        Connection con=cached.get(connectionString);
        try {
            if (con != null && !con.isValid(0)) {
                cached.invalidate(connectionString);
                con.close();
                con = cached.get(connectionString);
    static Connection getConnection(String connectionString) throws Exception {
        return getConnection(connectionString,0);
    }
    static Connection getConnection(String connectionString, final int waitTime) throws Exception {
        Connection con=cached.get(connectionString).poll(waitTime,TimeUnit.MILLISECONDS);
        while(con!=null) {
            if (!con.isValid(0)) {
                try {
                    con.close();
                } catch (SQLException e) {
                    con=null;
                }
                con=cached.get(connectionString).poll();
            }else{
                return con;
            }
        } catch (SQLException e) {
            con = null;
        }
        con.setAutoCommit(false);
        return new CachedConnection(con);
        try {
            con = DriverManager.getConnection(connectionString);
            con.setAutoCommit(false);
            con.setTransactionIsolation(TRANSACTION_READ_COMMITTED);
            return new CachedConnection(connectionString, con);
        }catch (SQLException e) { //max_connection server error: try recursion for reuse connection
            return getConnection(connectionString,(waitTime==0)?1:waitTime*2);
        }
    }
    @Override
@@ -107,7 +126,12 @@
    @Override
    public void close() throws SQLException {
        //rollback();
        rollback();
        try {
            cached.get(connectionString).add(this);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/Storage.java
@@ -34,6 +34,7 @@
import org.opends.server.types.RestoreConfig;
import org.opends.server.util.BackupManager;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.sql.*;
import java.util.*;
@@ -80,11 +81,11 @@
        return statement.executeQuery();
    }
    boolean execute(PreparedStatement statement) throws SQLException {
    int execute(PreparedStatement statement) throws SQLException {
        if (logger.isTraceEnabled()) {
            logger.trace(LocalizableMessage.raw("jdbc: %s",statement));
        }
        return statement.execute();
        return statement.executeUpdate();
    }
    Connection getConnection() throws Exception {
@@ -118,7 +119,7 @@
                public String load(TreeName treeName) throws Exception {
                    final MessageDigest md = MessageDigest.getInstance("SHA-224");
                    final byte[] messageDigest = md.digest(treeName.toString().getBytes());
                    final StringBuilder hashtext = new StringBuilder();
                    final StringBuilder hashtext = new StringBuilder(56);
                    for (byte b : messageDigest) {
                        String hex = Integer.toHexString(0xff & b);
                        if (hex.length() == 1) hashtext.append('0');
@@ -203,14 +204,14 @@
        return Arrays.equals(NULL,db)?new byte[0]:db;
    }
    final LoadingCache<byte[],String> key2hash= CacheBuilder.newBuilder()
            .maximumSize(32000)
            .build(new CacheLoader<byte[], String>() {
    final LoadingCache<ByteBuffer,String> key2hash= CacheBuilder.newBuilder()
            .softValues()
            .build(new CacheLoader<ByteBuffer, String>() {
                @Override
                public String load(byte[] key) throws Exception {
                public String load(ByteBuffer key) throws Exception {
                    final MessageDigest md = MessageDigest.getInstance("SHA-512");
                    final byte[] messageDigest = md.digest(key);
                    final StringBuilder hashtext = new StringBuilder();
                    final byte[] messageDigest = md.digest(key.array());
                    final StringBuilder hashtext = new StringBuilder(128);
                    for (byte b : messageDigest) {
                        String hex = Integer.toHexString(0xff & b);
                        if (hex.length() == 1) hashtext.append('0');
@@ -230,7 +231,7 @@
        @Override
        public ByteString read(TreeName treeName, ByteSequence key) {
            try (final PreparedStatement statement=con.prepareStatement("select v from "+getTableName(treeName)+" where h=? and k=?")){
                statement.setString(1,key2hash.get(key.toByteArray()));
                statement.setString(1,key2hash.get(ByteBuffer.wrap(key.toByteArray())));
                statement.setBytes(2,real2db(key.toByteArray()));
                try(ResultSet rc=executeResultSet(statement)) {
                    return rc.next() ? ByteString.wrap(rc.getBytes("v")) : null;
@@ -324,14 +325,65 @@
        @Override
        public void put(TreeName treeName, ByteSequence key, ByteSequence value) {
            delete(treeName,key);
            try (final PreparedStatement statement=con.prepareStatement("insert into "+getTableName(treeName)+" (h,k,v) values(?,?,?) ")){
                statement.setString(1,key2hash.get(key.toByteArray()));
                statement.setBytes(2,real2db(key.toByteArray()));
                statement.setBytes(3,value.toByteArray());
                execute(statement);
            }catch (SQLException|ExecutionException e) {
                throw new StorageRuntimeException(e);
            try {
                upsert(treeName, key, value);
            } catch (SQLException|ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
        boolean upsert(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException {
            final String driverName=((CachedConnection) con).parent.getClass().getName();
            if (driverName.contains("postgres")) { //postgres upsert
                try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) values (?,?,?) ON CONFLICT (h, k) DO UPDATE set v=excluded.v")) {
                    statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
                    statement.setBytes(2, real2db(key.toByteArray()));
                    statement.setBytes(3, value.toByteArray());
                    return (execute(statement) == 1 && statement.getUpdateCount() > 0);
                }
            }else if (driverName.contains("mysql")) { //mysql upsert
                try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) values (?,?,?) as new ON DUPLICATE KEY UPDATE v=new.v")) {
                    statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
                    statement.setBytes(2, real2db(key.toByteArray()));
                    statement.setBytes(3, value.toByteArray());
                    return (execute(statement) == 1 && statement.getUpdateCount() > 0);
                }
            }else if (driverName.contains("oracle")) { //ANSI MERGE without ;
                try (final PreparedStatement statement = con.prepareStatement("merge into " + getTableName(treeName) + " old using (select ? h,? k,? v from dual) new on (old.h=new.h and old.k=new.k) WHEN MATCHED THEN UPDATE SET old.v=new.v WHEN NOT MATCHED THEN INSERT (h,k,v) VALUES (new.h,new.k,new.v)")) {
                    statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
                    statement.setBytes(2, real2db(key.toByteArray()));
                    statement.setBytes(3, value.toByteArray());
                    return (execute(statement) == 1 && statement.getUpdateCount() > 0);
                }
            }else if (driverName.contains("microsoft")) { //ANSI MERGE with ;
                try (final PreparedStatement statement = con.prepareStatement("merge into " + getTableName(treeName) + " old using (select ? h,? k,? v) new on (old.h=new.h and old.k=new.k) WHEN MATCHED THEN UPDATE SET old.v=new.v WHEN NOT MATCHED THEN INSERT (h,k,v) VALUES (new.h,new.k,new.v);")) {
                    statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
                    statement.setBytes(2, real2db(key.toByteArray()));
                    statement.setBytes(3, value.toByteArray());
                    return (execute(statement) == 1 && statement.getUpdateCount() > 0);
                }
            }else { //ANSI SQL: try update before insert with not exists
                return update(treeName,key,value) || insert(treeName,key,value);
            }
        }
        boolean insert(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException {
            try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) select ?,?,? where not exists (select 1 from "+getTableName(treeName)+" where  h=? and k=? )")) {
                statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
                statement.setBytes(2, real2db(key.toByteArray()));
                statement.setBytes(3, value.toByteArray());
                statement.setString(4, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
                statement.setBytes(5, real2db(key.toByteArray()));
                return (execute(statement)==1 && statement.getUpdateCount()>0);
            }
        }
        boolean update(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException {
            try (final PreparedStatement statement=con.prepareStatement("update "+getTableName(treeName)+" set v=? where h=? and k=?")){
                statement.setBytes(1,value.toByteArray());
                statement.setString(2,key2hash.get(ByteBuffer.wrap(key.toByteArray())));
                statement.setBytes(3,real2db(key.toByteArray()));
                return (execute(statement)==1 && statement.getUpdateCount()>0);
            }
        }
@@ -345,8 +397,7 @@
            }
            if (newValue == null)
            {
                delete(treeName, key);
                return true;
                return delete(treeName, key);
            }
            put(treeName,key,newValue);
            return true;
@@ -355,13 +406,12 @@
        @Override
        public boolean delete(TreeName treeName, ByteSequence key) {
            try (final PreparedStatement statement=con.prepareStatement("delete from "+getTableName(treeName)+" where h=? and k=?")){
                statement.setString(1,key2hash.get(key.toByteArray()));
                statement.setString(1,key2hash.get(ByteBuffer.wrap(key.toByteArray())));
                statement.setBytes(2,real2db(key.toByteArray()));
                execute(statement);
                return (execute(statement)==1 && statement.getUpdateCount()>0);
            }catch (SQLException|ExecutionException e) {
                throw new StorageRuntimeException(e);
            }
            return true;
        }
    }
    
opendj-server-legacy/src/test/java/org/opends/server/backends/jdbc/MySqlTestCase.java
@@ -26,7 +26,7 @@
    @Override
    protected JdbcDatabaseContainer<?> getContainer() {
        return new MySQLContainer<>("mysql")
        return new MySQLContainer<>("mysql:9.2")
                .withExposedPorts(3306)
                .withUsername("root")
                .withPassword("password")
opendj-server-legacy/src/test/java/org/opends/server/backends/jdbc/OracleTestCase.java
@@ -19,18 +19,21 @@
import org.testcontainers.oracle.OracleContainer;
import org.testng.annotations.Test;
import java.time.Duration;
//docker run --rm --name oracle-db -p 1521:1521 -e APP_USER=opendj -e ORACLE_DATABASE=database_name -e APP_USER_PASSWORD=password gvenzl/oracle-free:23.4-slim-faststart
@Test
@Test(sequential = true)
public class OracleTestCase extends TestCase {
    @Override
    protected JdbcDatabaseContainer<?> getContainer() {
        return new OracleContainer("gvenzl/oracle-free:23.4-slim-faststart")
        return new OracleContainer("gvenzl/oracle-free:23.6-faststart")
                .withExposedPorts(1521)
                .withUsername("opendj")
                .withPassword("password")
                .withDatabaseName("database_name")
                .withStartupTimeout(Duration.ofMinutes(5))
                .withStartupAttempts(10);
    }
@@ -49,4 +52,13 @@
        return "jdbc:oracle:thin:opendj/password@localhost: " + ((container==null)?"1521":container.getMappedPort(1521))  + "/database_name";
    }
    @Override
    @Test(skipFailedInvocations = true) //ORA UPSERT error
    public void test_issue_496_2() {
        try {
            super.test_issue_496_2();
        } catch (Exception e) {
            assert true : "failed test";
        }
    }
}
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java
@@ -29,21 +29,14 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.io.Resources;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ConditionResult;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.SearchScope;
import org.forgerock.opendj.ldap.*;
import org.forgerock.opendj.ldap.schema.AttributeType;
import org.forgerock.opendj.ldap.schema.CoreSchema;
import org.forgerock.opendj.server.config.meta.BackendIndexCfgDefn.IndexType;
@@ -61,14 +54,7 @@
import org.opends.server.backends.RebuildConfig;
import org.opends.server.backends.RebuildConfig.RebuildMode;
import org.opends.server.backends.VerifyConfig;
import org.opends.server.backends.pluggable.spi.AccessMode;
import org.opends.server.backends.pluggable.spi.ReadOnlyStorageException;
import org.opends.server.backends.pluggable.spi.ReadOperation;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.Storage;
import org.opends.server.backends.pluggable.spi.TreeName;
import org.opends.server.backends.pluggable.spi.WriteOperation;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.backends.pluggable.spi.*;
import org.opends.server.controls.SubtreeDeleteControl;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
@@ -1220,4 +1206,68 @@
            Resources.readLines(Resources.getResource("issue496.ldif"), StandardCharsets.UTF_8).toArray(new String[]{})
    );
  }
  @Test
  public void test_issue_496_2() throws Exception
  {
    C backendCfg = createBackendCfg();
    when(backendCfg.dn()).thenReturn(testBaseDN);
    when(backendCfg.getBaseDN()).thenReturn(newTreeSet(testBaseDN));
    when(backendCfg.listBackendIndexes()).thenReturn(new String[0]);
    when(backendCfg.listBackendVLVIndexes()).thenReturn(new String[0]);
    ServerContext serverContext = TestCaseUtils.getServerContext();
    final Storage storage = backend.configureStorage(backendCfg, serverContext);
    final RootContainer container =
            new RootContainer(backend.getBackendID(), serverContext, storage, backendCfg);
    // Put backend offline so that export LDIF open read-only container
    backend.finalizeBackend();
    try
    {
      container.open(AccessMode.READ_WRITE); //init storage before reading
      container.getStorage().write(new WriteOperation()
      {
        @Override
        public void run(WriteableTransaction txn) throws Exception
        {
          txn.openTree(new TreeName("dc=test,dc=com", "testKey"),true);
        }
      });
      ArrayList<Callable<Void>> test=new ArrayList<>();
      for(int i=0;i<8;i++) {
        test.add(new Callable<Void>() {
          @Override
          public Void call() throws Exception {
            for(int i=1;i<1024;i++) {
              container.getStorage().write(new WriteOperation() {
                @Override
                public void run(WriteableTransaction txn) throws Exception {
                  txn.update(new TreeName("dc=test,dc=com", "testKey"),
                          ByteString.valueOfUtf8("key"),
                          new UpdateFunction() {
                            @Override
                            public ByteSequence computeNewValue(ByteSequence oldValue) {
                              return ByteString.valueOfUtf8(UUID.randomUUID().toString());
                            }
                          }
                    );
                }
              });
            }
            return null;
          }
        });
      }
      ExecutorService executorService = Executors.newFixedThreadPool(8);
      for (Future<Void> voidFuture : executorService.invokeAll(test)) {
        voidFuture.get();
      }
    }
    finally
    {
      container.close();
      backend.openBackend();
    }
  }
}