From f51e4456baf7d5538f8d5e06dddba6aa25c67b33 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 20 Sep 2013 22:30:19 +0000
Subject: [PATCH] Backport fix OPENDJ-1121: Closing a connection after closing the connectionfactory causes NPE

---
 opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java  |   22 +++++++
 opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java  |   58 +++++++++++++++---
 opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java |   62 ++++++++++++++++----
 opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java             |    1 
 4 files changed, 120 insertions(+), 23 deletions(-)

diff --git a/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index 0ca678f..d82915d 100644
--- a/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -642,6 +642,7 @@
         }
         factory.getTimeoutChecker().removeConnection(this);
         connection.closeSilently();
+        factory.releaseTransportAndTimeoutChecker();
 
         // Notify listeners.
         if (tmpListeners != null) {
diff --git a/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java b/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
index 9a60962..91b4c93 100644
--- a/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
+++ b/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -35,6 +35,7 @@
 import java.net.SocketAddress;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLEngine;
 
@@ -100,6 +101,7 @@
             // Give up immediately if the future has been cancelled.
             if (future.isCancelled()) {
                 connection.close();
+                releaseTransportAndTimeoutChecker();
                 return;
             }
 
@@ -138,7 +140,6 @@
                                 public void failed(final Throwable throwable) {
                                     onFailure(connection, throwable);
                                 }
-
                             });
                 } catch (final IOException e) {
                     onFailure(connection, e);
@@ -150,6 +151,7 @@
         public void failed(final Throwable throwable) {
             // Adapt and forward.
             future.handleErrorResult(adaptConnectionException(throwable));
+            releaseTransportAndTimeoutChecker();
         }
 
         @Override
@@ -186,6 +188,7 @@
             // Abort connection attempt due to error.
             connection.close();
             future.handleErrorResult(adaptConnectionException(t));
+            releaseTransportAndTimeoutChecker();
         }
 
         private void onSuccess(final LDAPConnection connection) {
@@ -194,6 +197,7 @@
             // Close the connection if the future was cancelled.
             if (future.isCancelled()) {
                 connection.close();
+                releaseTransportAndTimeoutChecker();
             }
         }
     }
@@ -202,8 +206,20 @@
     private final FilterChain defaultFilterChain;
     private final LDAPOptions options;
     private final SocketAddress socketAddress;
-    private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
+
+    /**
+     * Prevents the transport and timeoutChecker being released when there are
+     * remaining references (this factory or any connections). It is initially
+     * set to 1 because this factory has a reference.
+     */
+    private final AtomicInteger referenceCount = new AtomicInteger(1);
+
+    /**
+     * Indicates whether this factory has been closed or not.
+     */
     private final AtomicBoolean isClosed = new AtomicBoolean();
+
+    private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
     private final ReferenceCountedObject<TimeoutChecker>.Reference timeoutChecker = TIMEOUT_CHECKER
             .acquire();
 
@@ -230,8 +246,7 @@
     @Override
     public void close() {
         if (isClosed.compareAndSet(false, true)) {
-            transport.release();
-            timeoutChecker.release();
+            releaseTransportAndTimeoutChecker();
         }
     }
 
@@ -247,6 +262,7 @@
     @Override
     public FutureResult<Connection> getConnectionAsync(
             final ResultHandler<? super Connection> handler) {
+        acquireTransportAndTimeoutChecker(); // Protect resources.
         final SocketConnectorHandler connectorHandler =
                 TCPNIOConnectorHandler.builder(transport.get()).processor(defaultFilterChain)
                         .build();
@@ -266,6 +282,15 @@
         return socketAddress;
     }
 
+    @Override
+    public String toString() {
+        final StringBuilder builder = new StringBuilder();
+        builder.append("LDAPConnectionFactory(");
+        builder.append(getSocketAddress().toString());
+        builder.append(')');
+        return builder.toString();
+    }
+
     TimeoutChecker getTimeoutChecker() {
         return timeoutChecker.get();
     }
@@ -274,12 +299,23 @@
         return options;
     }
 
-    @Override
-    public String toString() {
-        final StringBuilder builder = new StringBuilder();
-        builder.append("LDAPConnectionFactory(");
-        builder.append(getSocketAddress().toString());
-        builder.append(')');
-        return builder.toString();
+    void releaseTransportAndTimeoutChecker() {
+        if (referenceCount.decrementAndGet() == 0) {
+            transport.release();
+            timeoutChecker.release();
+        }
+    }
+
+    private void acquireTransportAndTimeoutChecker() {
+        /*
+         * If the factory is not closed then we need to prevent the resources
+         * (transport, timeout checker) from being released while the connection
+         * attempt is in progress.
+         */
+        referenceCount.incrementAndGet();
+        if (isClosed.get()) {
+            releaseTransportAndTimeoutChecker();
+            throw new IllegalStateException("Attempted to get a connection after factory close");
+        }
     }
 }
diff --git a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 89c115f..0eb6d38 100644
--- a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -44,6 +44,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import java.util.logging.Level;
 
@@ -190,6 +191,7 @@
                         connection.close();
                         connection = null;
                     }
+                    releaseScheduler();
                     return adaptHeartBeatError(errorResult);
                 }
 
@@ -204,7 +206,6 @@
             // Create a future which will handle connection result.
             this.futureConnectionResult =
                     new RecursiveFutureResult<Connection, Result>(futureSearchResult) {
-
                         @Override
                         protected FutureResult<? extends Result> chainResult(
                                 final Connection innerResult,
@@ -710,6 +711,7 @@
                         heartBeatFuture.cancel(false);
                     }
                 }
+                releaseScheduler();
             }
         }
 
@@ -1130,11 +1132,20 @@
      * responses) before starting to send heartbeats.
      */
     private final long minDelayMS;
+
+    /**
+     * Prevents the scheduler being released when there are remaining references
+     * (this factory or any connections). It is initially set to 1 because this
+     * factory has a reference.
+     */
+    private final AtomicInteger referenceCount = new AtomicInteger(1);
+
     /**
      * The heartbeat scheduler.
      */
     private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
 
+
     /**
      * Scheduled task which sends heart beats for all valid idle connections.
      */
@@ -1191,29 +1202,54 @@
                     }
                 }
             }
-            scheduler.release();
+            releaseScheduler();
             factory.close();
         }
     }
 
+    private void releaseScheduler() {
+        if (referenceCount.decrementAndGet() == 0) {
+            scheduler.release();
+        }
+    }
+
+    private void acquireScheduler() {
+        /*
+         * If the factory is not closed then we need to prevent the scheduler
+         * from being released while the connection attempt is in progress.
+         */
+        referenceCount.incrementAndGet();
+        if (isClosed.get()) {
+            releaseScheduler();
+            throw new IllegalStateException("Attempted to get a connection after factory close");
+        }
+    }
+
     @Override
     public Connection getConnection() throws ErrorResultException {
         /*
          * Immediately send a heart beat in order to determine if the connected
          * server is responding.
          */
-        final Connection connection = factory.getConnection();
-        boolean keepConnection = false;
+        acquireScheduler(); // Protect scheduler.
+        boolean succeeded = false;
         try {
-            connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
-                    TimeUnit.MILLISECONDS);
-            keepConnection = true;
-            return adaptConnection(connection);
-        } catch (final Exception e) {
-            throw adaptHeartBeatError(e);
+            final Connection connection = factory.getConnection();
+            try {
+                connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
+                        TimeUnit.MILLISECONDS);
+                succeeded = true;
+                return adaptConnection(connection);
+            } catch (final Exception e) {
+                throw adaptHeartBeatError(e);
+            } finally {
+                if (!succeeded) {
+                    connection.close();
+                }
+            }
         } finally {
-            if (!keepConnection) {
-                connection.close();
+            if (!succeeded) {
+                releaseScheduler();
             }
         }
     }
@@ -1221,6 +1257,8 @@
     @Override
     public FutureResult<Connection> getConnectionAsync(
             final ResultHandler<? super Connection> handler) {
+        acquireScheduler(); // Protect scheduler.
+
         // Create a future responsible for chaining the initial heartbeat search.
         final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler);
 
diff --git a/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java b/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
index 9c2ec9a..bc49974 100644
--- a/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
+++ b/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -27,7 +27,11 @@
 
 package org.forgerock.opendj.ldap;
 
+import static java.util.Arrays.asList;
 import static org.fest.assertions.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
+import static org.forgerock.opendj.ldap.Connections.newHeartBeatConnectionFactory;
+import static org.forgerock.opendj.ldap.Connections.newLoadBalancer;
 import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
 import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
 import static org.forgerock.opendj.ldap.TestCaseUtils.getServerSocketAddress;
@@ -650,6 +654,24 @@
         }
     }
 
+    @Test(description = "Test for OPENDJ-1121: Closing a connection after "
+            + "closing the connection factory causes NPE")
+    public void testFactoryCloseBeforeConnectionClose() throws Exception {
+        final ConnectionFactory factory =
+                newLoadBalancer(new FailoverLoadBalancingAlgorithm(asList(newFixedConnectionPool(
+                        newHeartBeatConnectionFactory(new LDAPConnectionFactory(
+                                getServerSocketAddress())), 2))));
+        Connection conn = null;
+        try {
+            conn = factory.getConnection();
+        } finally {
+            factory.close();
+            if (conn != null) {
+                conn.close();
+            }
+        }
+    }
+
     private void waitForCondition(Callable<Boolean> condition) throws Exception {
         long timeout = System.currentTimeMillis() + TEST_TIMEOUT_MS;
         while (!condition.call()) {

--
Gitblit v1.10.0