From e417cd1a6892d2810e8baf733bb09a3ae5a3c4c2 Mon Sep 17 00:00:00 2001
From: Valery Kharseko <vharseko@3a-systems.ru>
Date: Wed, 23 Apr 2025 07:58:07 +0000
Subject: [PATCH] [#496] FIX JDBC storage update concurrency (#512)
---
opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/Storage.java | 96 ++++++++++++++++++++++++++++++++++++-----------
1 files changed, 73 insertions(+), 23 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/Storage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/Storage.java
index 684c866..243daa1 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/Storage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/Storage.java
@@ -34,6 +34,7 @@
import org.opends.server.types.RestoreConfig;
import org.opends.server.util.BackupManager;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.sql.*;
import java.util.*;
@@ -80,11 +81,11 @@
return statement.executeQuery();
}
- boolean execute(PreparedStatement statement) throws SQLException {
+ int execute(PreparedStatement statement) throws SQLException {
if (logger.isTraceEnabled()) {
logger.trace(LocalizableMessage.raw("jdbc: %s",statement));
}
- return statement.execute();
+ return statement.executeUpdate();
}
Connection getConnection() throws Exception {
@@ -118,7 +119,7 @@
public String load(TreeName treeName) throws Exception {
final MessageDigest md = MessageDigest.getInstance("SHA-224");
final byte[] messageDigest = md.digest(treeName.toString().getBytes());
- final StringBuilder hashtext = new StringBuilder();
+ final StringBuilder hashtext = new StringBuilder(56);
for (byte b : messageDigest) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) hashtext.append('0');
@@ -203,14 +204,14 @@
return Arrays.equals(NULL,db)?new byte[0]:db;
}
- final LoadingCache<byte[],String> key2hash= CacheBuilder.newBuilder()
- .maximumSize(32000)
- .build(new CacheLoader<byte[], String>() {
+ final LoadingCache<ByteBuffer,String> key2hash= CacheBuilder.newBuilder()
+ .softValues()
+ .build(new CacheLoader<ByteBuffer, String>() {
@Override
- public String load(byte[] key) throws Exception {
+ public String load(ByteBuffer key) throws Exception {
final MessageDigest md = MessageDigest.getInstance("SHA-512");
- final byte[] messageDigest = md.digest(key);
- final StringBuilder hashtext = new StringBuilder();
+ final byte[] messageDigest = md.digest(key.array());
+ final StringBuilder hashtext = new StringBuilder(128);
for (byte b : messageDigest) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) hashtext.append('0');
@@ -230,7 +231,7 @@
@Override
public ByteString read(TreeName treeName, ByteSequence key) {
try (final PreparedStatement statement=con.prepareStatement("select v from "+getTableName(treeName)+" where h=? and k=?")){
- statement.setString(1,key2hash.get(key.toByteArray()));
+ statement.setString(1,key2hash.get(ByteBuffer.wrap(key.toByteArray())));
statement.setBytes(2,real2db(key.toByteArray()));
try(ResultSet rc=executeResultSet(statement)) {
return rc.next() ? ByteString.wrap(rc.getBytes("v")) : null;
@@ -324,14 +325,65 @@
@Override
public void put(TreeName treeName, ByteSequence key, ByteSequence value) {
- delete(treeName,key);
- try (final PreparedStatement statement=con.prepareStatement("insert into "+getTableName(treeName)+" (h,k,v) values(?,?,?) ")){
- statement.setString(1,key2hash.get(key.toByteArray()));
- statement.setBytes(2,real2db(key.toByteArray()));
- statement.setBytes(3,value.toByteArray());
- execute(statement);
- }catch (SQLException|ExecutionException e) {
- throw new StorageRuntimeException(e);
+ try {
+ upsert(treeName, key, value);
+ } catch (SQLException|ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ boolean upsert(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException {
+ final String driverName=((CachedConnection) con).parent.getClass().getName();
+ if (driverName.contains("postgres")) { //postgres upsert
+ try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) values (?,?,?) ON CONFLICT (h, k) DO UPDATE set v=excluded.v")) {
+ statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
+ statement.setBytes(2, real2db(key.toByteArray()));
+ statement.setBytes(3, value.toByteArray());
+ return (execute(statement) == 1 && statement.getUpdateCount() > 0);
+ }
+ }else if (driverName.contains("mysql")) { //mysql upsert
+ try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) values (?,?,?) as new ON DUPLICATE KEY UPDATE v=new.v")) {
+ statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
+ statement.setBytes(2, real2db(key.toByteArray()));
+ statement.setBytes(3, value.toByteArray());
+ return (execute(statement) == 1 && statement.getUpdateCount() > 0);
+ }
+ }else if (driverName.contains("oracle")) { //ANSI MERGE without ;
+ try (final PreparedStatement statement = con.prepareStatement("merge into " + getTableName(treeName) + " old using (select ? h,? k,? v from dual) new on (old.h=new.h and old.k=new.k) WHEN MATCHED THEN UPDATE SET old.v=new.v WHEN NOT MATCHED THEN INSERT (h,k,v) VALUES (new.h,new.k,new.v)")) {
+ statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
+ statement.setBytes(2, real2db(key.toByteArray()));
+ statement.setBytes(3, value.toByteArray());
+ return (execute(statement) == 1 && statement.getUpdateCount() > 0);
+ }
+ }else if (driverName.contains("microsoft")) { //ANSI MERGE with ;
+ try (final PreparedStatement statement = con.prepareStatement("merge into " + getTableName(treeName) + " old using (select ? h,? k,? v) new on (old.h=new.h and old.k=new.k) WHEN MATCHED THEN UPDATE SET old.v=new.v WHEN NOT MATCHED THEN INSERT (h,k,v) VALUES (new.h,new.k,new.v);")) {
+ statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
+ statement.setBytes(2, real2db(key.toByteArray()));
+ statement.setBytes(3, value.toByteArray());
+ return (execute(statement) == 1 && statement.getUpdateCount() > 0);
+ }
+ }else { //ANSI SQL: try update before insert with not exists
+ return update(treeName,key,value) || insert(treeName,key,value);
+ }
+ }
+
+ boolean insert(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException {
+ try (final PreparedStatement statement = con.prepareStatement("insert into " + getTableName(treeName) + " (h,k,v) select ?,?,? where not exists (select 1 from "+getTableName(treeName)+" where h=? and k=? )")) {
+ statement.setString(1, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
+ statement.setBytes(2, real2db(key.toByteArray()));
+ statement.setBytes(3, value.toByteArray());
+ statement.setString(4, key2hash.get(ByteBuffer.wrap(key.toByteArray())));
+ statement.setBytes(5, real2db(key.toByteArray()));
+ return (execute(statement)==1 && statement.getUpdateCount()>0);
+ }
+ }
+
+ boolean update(TreeName treeName, ByteSequence key, ByteSequence value) throws SQLException, ExecutionException {
+ try (final PreparedStatement statement=con.prepareStatement("update "+getTableName(treeName)+" set v=? where h=? and k=?")){
+ statement.setBytes(1,value.toByteArray());
+ statement.setString(2,key2hash.get(ByteBuffer.wrap(key.toByteArray())));
+ statement.setBytes(3,real2db(key.toByteArray()));
+ return (execute(statement)==1 && statement.getUpdateCount()>0);
}
}
@@ -345,8 +397,7 @@
}
if (newValue == null)
{
- delete(treeName, key);
- return true;
+ return delete(treeName, key);
}
put(treeName,key,newValue);
return true;
@@ -355,13 +406,12 @@
@Override
public boolean delete(TreeName treeName, ByteSequence key) {
try (final PreparedStatement statement=con.prepareStatement("delete from "+getTableName(treeName)+" where h=? and k=?")){
- statement.setString(1,key2hash.get(key.toByteArray()));
+ statement.setString(1,key2hash.get(ByteBuffer.wrap(key.toByteArray())));
statement.setBytes(2,real2db(key.toByteArray()));
- execute(statement);
+ return (execute(statement)==1 && statement.getUpdateCount()>0);
}catch (SQLException|ExecutionException e) {
throw new StorageRuntimeException(e);
}
- return true;
}
}
--
Gitblit v1.10.0