From e417cd1a6892d2810e8baf733bb09a3ae5a3c4c2 Mon Sep 17 00:00:00 2001
From: Valery Kharseko <vharseko@3a-systems.ru>
Date: Wed, 23 Apr 2025 07:58:07 +0000
Subject: [PATCH] [#496] FIX JDBC storage update concurrency (#512)

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/CachedConnection.java |   76 +++++++++++++++++++++++++-------------
 1 files changed, 50 insertions(+), 26 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/CachedConnection.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/CachedConnection.java
index ede8c13..1652ecc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/CachedConnection.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/jdbc/CachedConnection.java
@@ -18,51 +18,70 @@
 import com.google.common.cache.*;
 
 import java.sql.*;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
+import java.util.Queue;
+import java.util.concurrent.*;
 
 public class CachedConnection implements Connection {
     final Connection parent;
 
-    static LoadingCache<String,Connection> cached= CacheBuilder.newBuilder()
+    static LoadingCache<String, BlockingQueue<Connection>> cached= CacheBuilder.newBuilder()
             .expireAfterAccess(Long.parseLong(System.getProperty("org.openidentityplatform.opendj.jdbc.ttl","15000")), TimeUnit.MILLISECONDS)
-            .removalListener(new RemovalListener<String, Connection>() {
+            .removalListener(new RemovalListener<String, BlockingQueue<Connection>>() {
                 @Override
-                public void onRemoval(RemovalNotification<String, Connection> notification) {
-                    try {
-                        if (!notification.getValue().isClosed()) {
-                            notification.getValue().close();
+                public void onRemoval(RemovalNotification<String, BlockingQueue<Connection>> notification) {
+                    assert notification.getValue() != null;
+                    for (Connection con: notification.getValue()) {
+                            try {
+                                if (!con.isClosed()) {
+                                    con.close();
+                                }
+                            } catch (SQLException e) {
+                            }
                         }
-                    } catch (SQLException e) {
-                    }
                 }
             })
-            .build(new CacheLoader<String, Connection>() {
+            .build(new CacheLoader<String, BlockingQueue<Connection>>() {
                 @Override
-                public Connection load(String connectionString) throws Exception {
-                    return DriverManager.getConnection(connectionString);
+                public BlockingQueue<Connection> load(String connectionString) throws Exception {
+                    return new LinkedBlockingQueue<>();
                 }
             });
 
-    public CachedConnection(Connection parent) {
+    final String connectionString;
+    public CachedConnection(String connectionString,Connection parent) {
+        this.connectionString=connectionString;
         this.parent = parent;
     }
 
-    static CachedConnection getConnection(String connectionString) throws Exception {
-        Connection con=cached.get(connectionString);
-        try {
-            if (con != null && !con.isValid(0)) {
-                cached.invalidate(connectionString);
-                con.close();
-                con = cached.get(connectionString);
+    static Connection getConnection(String connectionString) throws Exception {
+        return getConnection(connectionString,0);
+    }
+
+    static Connection getConnection(String connectionString, final int waitTime) throws Exception {
+        Connection con=cached.get(connectionString).poll(waitTime,TimeUnit.MILLISECONDS);
+        while(con!=null) {
+            if (!con.isValid(0)) {
+                try {
+                    con.close();
+                } catch (SQLException e) {
+                    con=null;
+                }
+                con=cached.get(connectionString).poll();
+            }else{
+                return con;
             }
-        } catch (SQLException e) {
-            con = null;
         }
-        con.setAutoCommit(false);
-        return new CachedConnection(con);
+        try {
+            con = DriverManager.getConnection(connectionString);
+            con.setAutoCommit(false);
+            con.setTransactionIsolation(TRANSACTION_READ_COMMITTED);
+            return new CachedConnection(connectionString, con);
+        }catch (SQLException e) { //max_connection server error: try recursion for reuse connection
+            return getConnection(connectionString,(waitTime==0)?1:waitTime*2);
+        }
     }
 
     @Override
@@ -107,7 +126,12 @@
 
     @Override
     public void close() throws SQLException {
-        //rollback();
+        rollback();
+        try {
+            cached.get(connectionString).add(this);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override

--
Gitblit v1.10.0