From 632803028dfe8bf67c23ece38f647c78c103cf9f Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 20 Sep 2013 22:27:44 +0000
Subject: [PATCH] Fix OPENDJ-1121: Closing a connection after closing the connectionfactory causes NPE
---
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java | 58 +++++++++++++++---
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java | 12 +++-
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java | 1
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java | 62 ++++++++++++++++----
4 files changed, 107 insertions(+), 26 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index d942b51..dff0a0a 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -641,6 +641,7 @@
}
factory.getTimeoutChecker().removeConnection(this);
connection.closeSilently();
+ factory.releaseTransportAndTimeoutChecker();
// Notify listeners.
if (tmpListeners != null) {
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
index 9a60962..91b4c93 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -35,6 +35,7 @@
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
@@ -100,6 +101,7 @@
// Give up immediately if the future has been cancelled.
if (future.isCancelled()) {
connection.close();
+ releaseTransportAndTimeoutChecker();
return;
}
@@ -138,7 +140,6 @@
public void failed(final Throwable throwable) {
onFailure(connection, throwable);
}
-
});
} catch (final IOException e) {
onFailure(connection, e);
@@ -150,6 +151,7 @@
public void failed(final Throwable throwable) {
// Adapt and forward.
future.handleErrorResult(adaptConnectionException(throwable));
+ releaseTransportAndTimeoutChecker();
}
@Override
@@ -186,6 +188,7 @@
// Abort connection attempt due to error.
connection.close();
future.handleErrorResult(adaptConnectionException(t));
+ releaseTransportAndTimeoutChecker();
}
private void onSuccess(final LDAPConnection connection) {
@@ -194,6 +197,7 @@
// Close the connection if the future was cancelled.
if (future.isCancelled()) {
connection.close();
+ releaseTransportAndTimeoutChecker();
}
}
}
@@ -202,8 +206,20 @@
private final FilterChain defaultFilterChain;
private final LDAPOptions options;
private final SocketAddress socketAddress;
- private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
+
+ /**
+ * Prevents the transport and timeoutChecker being released when there are
+ * remaining references (this factory or any connections). It is initially
+ * set to 1 because this factory has a reference.
+ */
+ private final AtomicInteger referenceCount = new AtomicInteger(1);
+
+ /**
+ * Indicates whether this factory has been closed or not.
+ */
private final AtomicBoolean isClosed = new AtomicBoolean();
+
+ private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
private final ReferenceCountedObject<TimeoutChecker>.Reference timeoutChecker = TIMEOUT_CHECKER
.acquire();
@@ -230,8 +246,7 @@
@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
- transport.release();
- timeoutChecker.release();
+ releaseTransportAndTimeoutChecker();
}
}
@@ -247,6 +262,7 @@
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
+ acquireTransportAndTimeoutChecker(); // Protect resources.
final SocketConnectorHandler connectorHandler =
TCPNIOConnectorHandler.builder(transport.get()).processor(defaultFilterChain)
.build();
@@ -266,6 +282,15 @@
return socketAddress;
}
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("LDAPConnectionFactory(");
+ builder.append(getSocketAddress().toString());
+ builder.append(')');
+ return builder.toString();
+ }
+
TimeoutChecker getTimeoutChecker() {
return timeoutChecker.get();
}
@@ -274,12 +299,23 @@
return options;
}
- @Override
- public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("LDAPConnectionFactory(");
- builder.append(getSocketAddress().toString());
- builder.append(')');
- return builder.toString();
+ void releaseTransportAndTimeoutChecker() {
+ if (referenceCount.decrementAndGet() == 0) {
+ transport.release();
+ timeoutChecker.release();
+ }
+ }
+
+ private void acquireTransportAndTimeoutChecker() {
+ /*
+ * If the factory is not closed then we need to prevent the resources
+ * (transport, timeout checker) from being released while the connection
+ * attempt is in progress.
+ */
+ referenceCount.incrementAndGet();
+ if (isClosed.get()) {
+ releaseTransportAndTimeoutChecker();
+ throw new IllegalStateException("Attempted to get a connection after factory close");
+ }
}
}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 89c115f..0eb6d38 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -44,6 +44,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Level;
@@ -190,6 +191,7 @@
connection.close();
connection = null;
}
+ releaseScheduler();
return adaptHeartBeatError(errorResult);
}
@@ -204,7 +206,6 @@
// Create a future which will handle connection result.
this.futureConnectionResult =
new RecursiveFutureResult<Connection, Result>(futureSearchResult) {
-
@Override
protected FutureResult<? extends Result> chainResult(
final Connection innerResult,
@@ -710,6 +711,7 @@
heartBeatFuture.cancel(false);
}
}
+ releaseScheduler();
}
}
@@ -1130,11 +1132,20 @@
* responses) before starting to send heartbeats.
*/
private final long minDelayMS;
+
+ /**
+ * Prevents the scheduler being released when there are remaining references
+ * (this factory or any connections). It is initially set to 1 because this
+ * factory has a reference.
+ */
+ private final AtomicInteger referenceCount = new AtomicInteger(1);
+
/**
* The heartbeat scheduler.
*/
private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
+
/**
* Scheduled task which sends heart beats for all valid idle connections.
*/
@@ -1191,29 +1202,54 @@
}
}
}
- scheduler.release();
+ releaseScheduler();
factory.close();
}
}
+ private void releaseScheduler() {
+ if (referenceCount.decrementAndGet() == 0) {
+ scheduler.release();
+ }
+ }
+
+ private void acquireScheduler() {
+ /*
+ * If the factory is not closed then we need to prevent the scheduler
+ * from being released while the connection attempt is in progress.
+ */
+ referenceCount.incrementAndGet();
+ if (isClosed.get()) {
+ releaseScheduler();
+ throw new IllegalStateException("Attempted to get a connection after factory close");
+ }
+ }
+
@Override
public Connection getConnection() throws ErrorResultException {
/*
* Immediately send a heart beat in order to determine if the connected
* server is responding.
*/
- final Connection connection = factory.getConnection();
- boolean keepConnection = false;
+ acquireScheduler(); // Protect scheduler.
+ boolean succeeded = false;
try {
- connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
- TimeUnit.MILLISECONDS);
- keepConnection = true;
- return adaptConnection(connection);
- } catch (final Exception e) {
- throw adaptHeartBeatError(e);
+ final Connection connection = factory.getConnection();
+ try {
+ connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
+ TimeUnit.MILLISECONDS);
+ succeeded = true;
+ return adaptConnection(connection);
+ } catch (final Exception e) {
+ throw adaptHeartBeatError(e);
+ } finally {
+ if (!succeeded) {
+ connection.close();
+ }
+ }
} finally {
- if (!keepConnection) {
- connection.close();
+ if (!succeeded) {
+ releaseScheduler();
}
}
}
@@ -1221,6 +1257,8 @@
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
+ acquireScheduler(); // Protect scheduler.
+
// Create a future responsible for chaining the initial heartbeat search.
final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler);
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
index 4ad9e45..bc49974 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -27,8 +27,11 @@
package org.forgerock.opendj.ldap;
+import static java.util.Arrays.asList;
import static org.fest.assertions.Assertions.assertThat;
import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
+import static org.forgerock.opendj.ldap.Connections.newHeartBeatConnectionFactory;
+import static org.forgerock.opendj.ldap.Connections.newLoadBalancer;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.forgerock.opendj.ldap.TestCaseUtils.getServerSocketAddress;
@@ -651,15 +654,18 @@
}
}
- @Test(description = "Test for OPENDJ-1121", enabled = false)
+ @Test(description = "Test for OPENDJ-1121: Closing a connection after "
+ + "closing the connection factory causes NPE")
public void testFactoryCloseBeforeConnectionClose() throws Exception {
final ConnectionFactory factory =
- newFixedConnectionPool(new LDAPConnectionFactory(getServerSocketAddress()), 2);
+ newLoadBalancer(new FailoverLoadBalancingAlgorithm(asList(newFixedConnectionPool(
+ newHeartBeatConnectionFactory(new LDAPConnectionFactory(
+ getServerSocketAddress())), 2))));
Connection conn = null;
try {
conn = factory.getConnection();
- factory.close();
} finally {
+ factory.close();
if (conn != null) {
conn.close();
}
--
Gitblit v1.10.0