mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
21.27.2013 632803028dfe8bf67c23ece38f647c78c103cf9f
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -44,6 +44,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Level;
@@ -190,6 +191,7 @@
                        connection.close();
                        connection = null;
                    }
                    releaseScheduler();
                    return adaptHeartBeatError(errorResult);
                }
@@ -204,7 +206,6 @@
            // Create a future which will handle connection result.
            this.futureConnectionResult =
                    new RecursiveFutureResult<Connection, Result>(futureSearchResult) {
                        @Override
                        protected FutureResult<? extends Result> chainResult(
                                final Connection innerResult,
@@ -710,6 +711,7 @@
                        heartBeatFuture.cancel(false);
                    }
                }
                releaseScheduler();
            }
        }
@@ -1130,11 +1132,20 @@
     * responses) before starting to send heartbeats.
     */
    private final long minDelayMS;
    /**
     * Prevents the scheduler being released when there are remaining references
     * (this factory or any connections). It is initially set to 1 because this
     * factory has a reference.
     */
    private final AtomicInteger referenceCount = new AtomicInteger(1);
    /**
     * The heartbeat scheduler.
     */
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    /**
     * Scheduled task which sends heart beats for all valid idle connections.
     */
@@ -1191,29 +1202,54 @@
                    }
                }
            }
            scheduler.release();
            releaseScheduler();
            factory.close();
        }
    }
    private void releaseScheduler() {
        if (referenceCount.decrementAndGet() == 0) {
            scheduler.release();
        }
    }
    private void acquireScheduler() {
        /*
         * If the factory is not closed then we need to prevent the scheduler
         * from being released while the connection attempt is in progress.
         */
        referenceCount.incrementAndGet();
        if (isClosed.get()) {
            releaseScheduler();
            throw new IllegalStateException("Attempted to get a connection after factory close");
        }
    }
    @Override
    public Connection getConnection() throws ErrorResultException {
        /*
         * Immediately send a heart beat in order to determine if the connected
         * server is responding.
         */
        final Connection connection = factory.getConnection();
        boolean keepConnection = false;
        acquireScheduler(); // Protect scheduler.
        boolean succeeded = false;
        try {
            connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
                    TimeUnit.MILLISECONDS);
            keepConnection = true;
            return adaptConnection(connection);
        } catch (final Exception e) {
            throw adaptHeartBeatError(e);
            final Connection connection = factory.getConnection();
            try {
                connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
                        TimeUnit.MILLISECONDS);
                succeeded = true;
                return adaptConnection(connection);
            } catch (final Exception e) {
                throw adaptHeartBeatError(e);
            } finally {
                if (!succeeded) {
                    connection.close();
                }
            }
        } finally {
            if (!keepConnection) {
                connection.close();
            if (!succeeded) {
                releaseScheduler();
            }
        }
    }
@@ -1221,6 +1257,8 @@
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
        acquireScheduler(); // Protect scheduler.
        // Create a future responsible for chaining the initial heartbeat search.
        final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler);