mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
21.30.2013 f51e4456baf7d5538f8d5e06dddba6aa25c67b33
Backport fix OPENDJ-1121: Closing a connection after closing the connectionfactory causes NPE
4 files modified
143 ■■■■ changed files
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java 1 ●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java 58 ●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java 62 ●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java 22 ●●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -642,6 +642,7 @@
        }
        factory.getTimeoutChecker().removeConnection(this);
        connection.closeSilently();
        factory.releaseTransportAndTimeoutChecker();
        // Notify listeners.
        if (tmpListeners != null) {
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -35,6 +35,7 @@
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
@@ -100,6 +101,7 @@
            // Give up immediately if the future has been cancelled.
            if (future.isCancelled()) {
                connection.close();
                releaseTransportAndTimeoutChecker();
                return;
            }
@@ -138,7 +140,6 @@
                                public void failed(final Throwable throwable) {
                                    onFailure(connection, throwable);
                                }
                            });
                } catch (final IOException e) {
                    onFailure(connection, e);
@@ -150,6 +151,7 @@
        public void failed(final Throwable throwable) {
            // Adapt and forward.
            future.handleErrorResult(adaptConnectionException(throwable));
            releaseTransportAndTimeoutChecker();
        }
        @Override
@@ -186,6 +188,7 @@
            // Abort connection attempt due to error.
            connection.close();
            future.handleErrorResult(adaptConnectionException(t));
            releaseTransportAndTimeoutChecker();
        }
        private void onSuccess(final LDAPConnection connection) {
@@ -194,6 +197,7 @@
            // Close the connection if the future was cancelled.
            if (future.isCancelled()) {
                connection.close();
                releaseTransportAndTimeoutChecker();
            }
        }
    }
@@ -202,8 +206,20 @@
    private final FilterChain defaultFilterChain;
    private final LDAPOptions options;
    private final SocketAddress socketAddress;
    private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
    /**
     * Prevents the transport and timeoutChecker being released when there are
     * remaining references (this factory or any connections). It is initially
     * set to 1 because this factory has a reference.
     */
    private final AtomicInteger referenceCount = new AtomicInteger(1);
    /**
     * Indicates whether this factory has been closed or not.
     */
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
    private final ReferenceCountedObject<TimeoutChecker>.Reference timeoutChecker = TIMEOUT_CHECKER
            .acquire();
@@ -230,8 +246,7 @@
    @Override
    public void close() {
        if (isClosed.compareAndSet(false, true)) {
            transport.release();
            timeoutChecker.release();
            releaseTransportAndTimeoutChecker();
        }
    }
@@ -247,6 +262,7 @@
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
        acquireTransportAndTimeoutChecker(); // Protect resources.
        final SocketConnectorHandler connectorHandler =
                TCPNIOConnectorHandler.builder(transport.get()).processor(defaultFilterChain)
                        .build();
@@ -266,6 +282,15 @@
        return socketAddress;
    }
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("LDAPConnectionFactory(");
        builder.append(getSocketAddress().toString());
        builder.append(')');
        return builder.toString();
    }
    TimeoutChecker getTimeoutChecker() {
        return timeoutChecker.get();
    }
@@ -274,12 +299,23 @@
        return options;
    }
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("LDAPConnectionFactory(");
        builder.append(getSocketAddress().toString());
        builder.append(')');
        return builder.toString();
    void releaseTransportAndTimeoutChecker() {
        if (referenceCount.decrementAndGet() == 0) {
            transport.release();
            timeoutChecker.release();
        }
    }
    private void acquireTransportAndTimeoutChecker() {
        /*
         * If the factory is not closed then we need to prevent the resources
         * (transport, timeout checker) from being released while the connection
         * attempt is in progress.
         */
        referenceCount.incrementAndGet();
        if (isClosed.get()) {
            releaseTransportAndTimeoutChecker();
            throw new IllegalStateException("Attempted to get a connection after factory close");
        }
    }
}
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -44,6 +44,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Level;
@@ -190,6 +191,7 @@
                        connection.close();
                        connection = null;
                    }
                    releaseScheduler();
                    return adaptHeartBeatError(errorResult);
                }
@@ -204,7 +206,6 @@
            // Create a future which will handle connection result.
            this.futureConnectionResult =
                    new RecursiveFutureResult<Connection, Result>(futureSearchResult) {
                        @Override
                        protected FutureResult<? extends Result> chainResult(
                                final Connection innerResult,
@@ -710,6 +711,7 @@
                        heartBeatFuture.cancel(false);
                    }
                }
                releaseScheduler();
            }
        }
@@ -1130,11 +1132,20 @@
     * responses) before starting to send heartbeats.
     */
    private final long minDelayMS;
    /**
     * Prevents the scheduler being released when there are remaining references
     * (this factory or any connections). It is initially set to 1 because this
     * factory has a reference.
     */
    private final AtomicInteger referenceCount = new AtomicInteger(1);
    /**
     * The heartbeat scheduler.
     */
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    /**
     * Scheduled task which sends heart beats for all valid idle connections.
     */
@@ -1191,29 +1202,54 @@
                    }
                }
            }
            scheduler.release();
            releaseScheduler();
            factory.close();
        }
    }
    private void releaseScheduler() {
        if (referenceCount.decrementAndGet() == 0) {
            scheduler.release();
        }
    }
    private void acquireScheduler() {
        /*
         * If the factory is not closed then we need to prevent the scheduler
         * from being released while the connection attempt is in progress.
         */
        referenceCount.incrementAndGet();
        if (isClosed.get()) {
            releaseScheduler();
            throw new IllegalStateException("Attempted to get a connection after factory close");
        }
    }
    @Override
    public Connection getConnection() throws ErrorResultException {
        /*
         * Immediately send a heart beat in order to determine if the connected
         * server is responding.
         */
        final Connection connection = factory.getConnection();
        boolean keepConnection = false;
        acquireScheduler(); // Protect scheduler.
        boolean succeeded = false;
        try {
            connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
                    TimeUnit.MILLISECONDS);
            keepConnection = true;
            return adaptConnection(connection);
        } catch (final Exception e) {
            throw adaptHeartBeatError(e);
            final Connection connection = factory.getConnection();
            try {
                connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
                        TimeUnit.MILLISECONDS);
                succeeded = true;
                return adaptConnection(connection);
            } catch (final Exception e) {
                throw adaptHeartBeatError(e);
            } finally {
                if (!succeeded) {
                    connection.close();
                }
            }
        } finally {
            if (!keepConnection) {
                connection.close();
            if (!succeeded) {
                releaseScheduler();
            }
        }
    }
@@ -1221,6 +1257,8 @@
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
        acquireScheduler(); // Protect scheduler.
        // Create a future responsible for chaining the initial heartbeat search.
        final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler);
opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -27,7 +27,11 @@
package org.forgerock.opendj.ldap;
import static java.util.Arrays.asList;
import static org.fest.assertions.Assertions.assertThat;
import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
import static org.forgerock.opendj.ldap.Connections.newHeartBeatConnectionFactory;
import static org.forgerock.opendj.ldap.Connections.newLoadBalancer;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.forgerock.opendj.ldap.TestCaseUtils.getServerSocketAddress;
@@ -650,6 +654,24 @@
        }
    }
    @Test(description = "Test for OPENDJ-1121: Closing a connection after "
            + "closing the connection factory causes NPE")
    public void testFactoryCloseBeforeConnectionClose() throws Exception {
        final ConnectionFactory factory =
                newLoadBalancer(new FailoverLoadBalancingAlgorithm(asList(newFixedConnectionPool(
                        newHeartBeatConnectionFactory(new LDAPConnectionFactory(
                                getServerSocketAddress())), 2))));
        Connection conn = null;
        try {
            conn = factory.getConnection();
        } finally {
            factory.close();
            if (conn != null) {
                conn.close();
            }
        }
    }
    private void waitForCondition(Callable<Boolean> condition) throws Exception {
        long timeout = System.currentTimeMillis() + TEST_TIMEOUT_MS;
        while (!condition.call()) {