opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -641,6 +641,7 @@ } factory.getTimeoutChecker().removeConnection(this); connection.closeSilently(); factory.releaseTransportAndTimeoutChecker(); // Notify listeners. if (tmpListeners != null) { opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -35,6 +35,7 @@ import java.net.SocketAddress; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLEngine; @@ -100,6 +101,7 @@ // Give up immediately if the future has been cancelled. if (future.isCancelled()) { connection.close(); releaseTransportAndTimeoutChecker(); return; } @@ -138,7 +140,6 @@ public void failed(final Throwable throwable) { onFailure(connection, throwable); } }); } catch (final IOException e) { onFailure(connection, e); @@ -150,6 +151,7 @@ public void failed(final Throwable throwable) { // Adapt and forward. future.handleErrorResult(adaptConnectionException(throwable)); releaseTransportAndTimeoutChecker(); } @Override @@ -186,6 +188,7 @@ // Abort connection attempt due to error. connection.close(); future.handleErrorResult(adaptConnectionException(t)); releaseTransportAndTimeoutChecker(); } private void onSuccess(final LDAPConnection connection) { @@ -194,6 +197,7 @@ // Close the connection if the future was cancelled. if (future.isCancelled()) { connection.close(); releaseTransportAndTimeoutChecker(); } } } @@ -202,8 +206,20 @@ private final FilterChain defaultFilterChain; private final LDAPOptions options; private final SocketAddress socketAddress; private final ReferenceCountedObject<TCPNIOTransport>.Reference transport; /** * Prevents the transport and timeoutChecker being released when there are * remaining references (this factory or any connections). It is initially * set to 1 because this factory has a reference. */ private final AtomicInteger referenceCount = new AtomicInteger(1); /** * Indicates whether this factory has been closed or not. */ private final AtomicBoolean isClosed = new AtomicBoolean(); private final ReferenceCountedObject<TCPNIOTransport>.Reference transport; private final ReferenceCountedObject<TimeoutChecker>.Reference timeoutChecker = TIMEOUT_CHECKER .acquire(); @@ -230,8 +246,7 @@ @Override public void close() { if (isClosed.compareAndSet(false, true)) { transport.release(); timeoutChecker.release(); releaseTransportAndTimeoutChecker(); } } @@ -247,6 +262,7 @@ @Override public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> handler) { acquireTransportAndTimeoutChecker(); // Protect resources. final SocketConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(transport.get()).processor(defaultFilterChain) .build(); @@ -266,6 +282,15 @@ return socketAddress; } @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("LDAPConnectionFactory("); builder.append(getSocketAddress().toString()); builder.append(')'); return builder.toString(); } TimeoutChecker getTimeoutChecker() { return timeoutChecker.get(); } @@ -274,12 +299,23 @@ return options; } @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("LDAPConnectionFactory("); builder.append(getSocketAddress().toString()); builder.append(')'); return builder.toString(); void releaseTransportAndTimeoutChecker() { if (referenceCount.decrementAndGet() == 0) { transport.release(); timeoutChecker.release(); } } private void acquireTransportAndTimeoutChecker() { /* * If the factory is not closed then we need to prevent the resources * (transport, timeout checker) from being released while the connection * attempt is in progress. */ referenceCount.incrementAndGet(); if (isClosed.get()) { releaseTransportAndTimeoutChecker(); throw new IllegalStateException("Attempted to get a connection after factory close"); } } } opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.logging.Level; @@ -190,6 +191,7 @@ connection.close(); connection = null; } releaseScheduler(); return adaptHeartBeatError(errorResult); } @@ -204,7 +206,6 @@ // Create a future which will handle connection result. this.futureConnectionResult = new RecursiveFutureResult<Connection, Result>(futureSearchResult) { @Override protected FutureResult<? extends Result> chainResult( final Connection innerResult, @@ -710,6 +711,7 @@ heartBeatFuture.cancel(false); } } releaseScheduler(); } } @@ -1130,11 +1132,20 @@ * responses) before starting to send heartbeats. */ private final long minDelayMS; /** * Prevents the scheduler being released when there are remaining references * (this factory or any connections). It is initially set to 1 because this * factory has a reference. */ private final AtomicInteger referenceCount = new AtomicInteger(1); /** * The heartbeat scheduler. */ private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; /** * Scheduled task which sends heart beats for all valid idle connections. */ @@ -1191,36 +1202,63 @@ } } } scheduler.release(); releaseScheduler(); factory.close(); } } private void releaseScheduler() { if (referenceCount.decrementAndGet() == 0) { scheduler.release(); } } private void acquireScheduler() { /* * If the factory is not closed then we need to prevent the scheduler * from being released while the connection attempt is in progress. */ referenceCount.incrementAndGet(); if (isClosed.get()) { releaseScheduler(); throw new IllegalStateException("Attempted to get a connection after factory close"); } } @Override public Connection getConnection() throws ErrorResultException { /* * Immediately send a heart beat in order to determine if the connected * server is responding. */ acquireScheduler(); // Protect scheduler. boolean succeeded = false; try { final Connection connection = factory.getConnection(); boolean keepConnection = false; try { connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS, TimeUnit.MILLISECONDS); keepConnection = true; succeeded = true; return adaptConnection(connection); } catch (final Exception e) { throw adaptHeartBeatError(e); } finally { if (!keepConnection) { if (!succeeded) { connection.close(); } } } finally { if (!succeeded) { releaseScheduler(); } } } @Override public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> handler) { acquireScheduler(); // Protect scheduler. // Create a future responsible for chaining the initial heartbeat search. final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler); opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -27,8 +27,11 @@ package org.forgerock.opendj.ldap; import static java.util.Arrays.asList; import static org.fest.assertions.Assertions.assertThat; import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool; import static org.forgerock.opendj.ldap.Connections.newHeartBeatConnectionFactory; import static org.forgerock.opendj.ldap.Connections.newLoadBalancer; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress; import static org.forgerock.opendj.ldap.TestCaseUtils.getServerSocketAddress; @@ -651,15 +654,18 @@ } } @Test(description = "Test for OPENDJ-1121", enabled = false) @Test(description = "Test for OPENDJ-1121: Closing a connection after " + "closing the connection factory causes NPE") public void testFactoryCloseBeforeConnectionClose() throws Exception { final ConnectionFactory factory = newFixedConnectionPool(new LDAPConnectionFactory(getServerSocketAddress()), 2); newLoadBalancer(new FailoverLoadBalancingAlgorithm(asList(newFixedConnectionPool( newHeartBeatConnectionFactory(new LDAPConnectionFactory( getServerSocketAddress())), 2)))); Connection conn = null; try { conn = factory.getConnection(); factory.close(); } finally { factory.close(); if (conn != null) { conn.close(); }