mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
18.37.2013 3f7ddbf313aaabbfba4650cb2036cb41e51a9bde
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransport.java
@@ -22,133 +22,139 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package com.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import java.io.IOException;
import java.util.logging.Level;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
import org.glassfish.grizzly.strategies.WorkerThreadIOStrategy;
import com.forgerock.opendj.util.ReferenceCountedObject;
/**
 * The default {@link TCPNIOTransport} which all {@code LDAPConnectionFactory}s
 * and {@code LDAPListener}s will use unless otherwise specified in their
 * options.
 */
final class DefaultTCPNIOTransport {
    private static TCPNIOTransport defaultTransport = null;
    /**
     * Returns the default {@link TCPNIOTransport} which all
     * {@code LDAPConnectionFactory}s and {@code LDAPListener}s will use unless
     * otherwise specified in their options.
     *
     * @return The default {@link TCPNIOTransport}.
     */
    static synchronized TCPNIOTransport getInstance() {
        if (defaultTransport == null) {
            final TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
            // Determine which threading strategy to use, and total number of
            // threads.
            final String useWorkerThreadsStr =
                    System.getProperty("org.forgerock.opendj.transport.useWorkerThreads");
            final boolean useWorkerThreadStrategy;
            if (useWorkerThreadsStr != null) {
                useWorkerThreadStrategy = Boolean.parseBoolean(useWorkerThreadsStr);
            } else {
                // The most best performing strategy to use is the
                // SameThreadIOStrategy, however it can only be used in cases
                // where result listeners will not block.
                useWorkerThreadStrategy = true;
            }
            if (useWorkerThreadStrategy) {
                builder.setIOStrategy(WorkerThreadIOStrategy.getInstance());
            } else {
                builder.setIOStrategy(SameThreadIOStrategy.getInstance());
            }
            // Calculate thread counts.
            final int cpus = Runtime.getRuntime().availableProcessors();
            // Calculate the number of selector threads.
            final String selectorsStr =
                    System.getProperty("org.forgerock.opendj.transport.selectors");
            final int selectorThreadCount;
            if (selectorsStr != null) {
                selectorThreadCount = Integer.parseInt(selectorsStr);
            } else {
                selectorThreadCount =
                        useWorkerThreadStrategy ? Math.max(2, cpus / 4) : Math.max(5,
                                (cpus / 2) - 1);
            }
            builder.getSelectorThreadPoolConfig().setCorePoolSize(selectorThreadCount)
                    .setMaxPoolSize(selectorThreadCount).setPoolName(
                            "OpenDJ LDAP SDK Grizzly selector thread");
            // Calculate the number of worker threads.
            if (builder.getWorkerThreadPoolConfig() != null) {
                final String workersStr =
                        System.getProperty("org.forgerock.opendj.transport.workers");
                final int workerThreadCount;
                if (workersStr != null) {
                    workerThreadCount = Integer.parseInt(workersStr);
                } else {
                    workerThreadCount = useWorkerThreadStrategy ? Math.max(5, (cpus * 2)) : 0;
                }
                builder.getWorkerThreadPoolConfig().setCorePoolSize(workerThreadCount)
                        .setMaxPoolSize(workerThreadCount).setPoolName(
                                "OpenDJ LDAP SDK Grizzly worker thread");
            }
            // Parse IO related options.
            final String lingerStr = System.getProperty("org.forgerock.opendj.transport.linger");
            if (lingerStr != null) {
                // Disabled by default.
                builder.setLinger(Integer.parseInt(lingerStr));
            }
            final String tcpNoDelayStr =
                    System.getProperty("org.forgerock.opendj.transport.tcpNoDelay");
            if (tcpNoDelayStr != null) {
                // Enabled by default.
                builder.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelayStr));
            }
            final String reuseAddressStr =
                    System.getProperty("org.forgerock.opendj.transport.reuseAddress");
            if (reuseAddressStr != null) {
                // Enabled by default.
                builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
            }
            defaultTransport = builder.build();
            // FIXME: raise bug in Grizzly. We should not need to do this, but
            // failure to do so causes many deadlocks.
            defaultTransport.setSelectorRunnersCount(selectorThreadCount);
            try {
                defaultTransport.start();
            } catch (final IOException e) {
                throw new RuntimeException(e);
            }
        }
        return defaultTransport;
    }
final class DefaultTCPNIOTransport extends ReferenceCountedObject<TCPNIOTransport> {
    static final DefaultTCPNIOTransport DEFAULT_TRANSPORT = new DefaultTCPNIOTransport();
    private DefaultTCPNIOTransport() {
        // Prevent instantiation.
    }
    @Override
    protected void destroyInstance(final TCPNIOTransport instance) {
        try {
            instance.stop();
        } catch (final IOException e) {
            DEBUG_LOG.log(Level.WARNING,
                    "An error occurred while shutting down the Grizzly transport", e.getMessage());
        }
    }
    @Override
    protected TCPNIOTransport newInstance() {
        final TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
        /*
         * Determine which threading strategy to use, and total number of
         * threads.
         */
        final String useWorkerThreadsStr =
                System.getProperty("org.forgerock.opendj.transport.useWorkerThreads");
        final boolean useWorkerThreadStrategy;
        if (useWorkerThreadsStr != null) {
            useWorkerThreadStrategy = Boolean.parseBoolean(useWorkerThreadsStr);
        } else {
            /*
             * The most best performing strategy to use is the
             * SameThreadIOStrategy, however it can only be used in cases where
             * result listeners will not block.
             */
            useWorkerThreadStrategy = true;
        }
        if (useWorkerThreadStrategy) {
            builder.setIOStrategy(WorkerThreadIOStrategy.getInstance());
        } else {
            builder.setIOStrategy(SameThreadIOStrategy.getInstance());
        }
        // Calculate thread counts.
        final int cpus = Runtime.getRuntime().availableProcessors();
        // Calculate the number of selector threads.
        final String selectorsStr = System.getProperty("org.forgerock.opendj.transport.selectors");
        final int selectorThreadCount;
        if (selectorsStr != null) {
            selectorThreadCount = Integer.parseInt(selectorsStr);
        } else {
            selectorThreadCount =
                    useWorkerThreadStrategy ? Math.max(2, cpus / 4) : Math.max(5, (cpus / 2) - 1);
        }
        builder.getSelectorThreadPoolConfig().setCorePoolSize(selectorThreadCount).setMaxPoolSize(
                selectorThreadCount).setPoolName("OpenDJ LDAP SDK Grizzly selector thread");
        // Calculate the number of worker threads.
        if (builder.getWorkerThreadPoolConfig() != null) {
            final String workersStr = System.getProperty("org.forgerock.opendj.transport.workers");
            final int workerThreadCount;
            if (workersStr != null) {
                workerThreadCount = Integer.parseInt(workersStr);
            } else {
                workerThreadCount = useWorkerThreadStrategy ? Math.max(5, (cpus * 2)) : 0;
            }
            builder.getWorkerThreadPoolConfig().setCorePoolSize(workerThreadCount).setMaxPoolSize(
                    workerThreadCount).setPoolName("OpenDJ LDAP SDK Grizzly worker thread");
        }
        // Parse IO related options.
        final String lingerStr = System.getProperty("org.forgerock.opendj.transport.linger");
        if (lingerStr != null) {
            // Disabled by default.
            builder.setLinger(Integer.parseInt(lingerStr));
        }
        final String tcpNoDelayStr =
                System.getProperty("org.forgerock.opendj.transport.tcpNoDelay");
        if (tcpNoDelayStr != null) {
            // Enabled by default.
            builder.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelayStr));
        }
        final String reuseAddressStr =
                System.getProperty("org.forgerock.opendj.transport.reuseAddress");
        if (reuseAddressStr != null) {
            // Enabled by default.
            builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
        }
        final TCPNIOTransport transport = builder.build();
        // FIXME: raise bug in Grizzly. We should not need to do this, but
        // failure to do so causes many deadlocks.
        transport.setSelectorRunnersCount(selectorThreadCount);
        try {
            transport.start();
        } catch (final IOException e) {
            throw new RuntimeException(e);
        }
        return transport;
    }
}
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java
@@ -513,7 +513,6 @@
    }
    void registerConnection(final Connection<?> connection, final LDAPConnection ldapConnection) {
        TimeoutChecker.INSTANCE.addConnection(ldapConnection);
        LDAP_CONNECTION_ATTR.set(connection, ldapConnection);
    }
}
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -27,6 +27,7 @@
package com.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.io.IOException;
@@ -81,7 +82,6 @@
import org.glassfish.grizzly.ssl.SSLFilter;
import com.forgerock.opendj.util.CompletedFutureResult;
import com.forgerock.opendj.util.StaticUtils;
import com.forgerock.opendj.util.Validator;
/**
@@ -109,7 +109,7 @@
    private final org.glassfish.grizzly.Connection<?> connection;
    private final LDAPWriter ldapWriter = new LDAPWriter();
    private final AtomicInteger nextMsgID = new AtomicInteger(1);
    private final LDAPOptions options;
    private final LDAPConnectionFactoryImpl factory;
    private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests =
            new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>();
    private final Object stateLock = new Object();
@@ -120,22 +120,12 @@
    private boolean isFailed = false;
    private List<ConnectionEventListener> listeners = null;
    /**
     * Creates a new LDAP connection.
     *
     * @param connection
     *            The Grizzly connection.
     * @param options
     *            The LDAP client options.
     */
    LDAPConnection(final org.glassfish.grizzly.Connection<?> connection, final LDAPOptions options) {
    LDAPConnection(final org.glassfish.grizzly.Connection<?> connection,
            final LDAPConnectionFactoryImpl factory) {
        this.connection = connection;
        this.options = options;
        this.factory = factory;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Void> abandonAsync(final AbandonRequest request) {
        final AbstractLDAPFutureResultImpl<?> pendingRequest;
@@ -148,9 +138,11 @@
                pendingRequest = pendingRequests.remove(request.getRequestID());
            }
            if (pendingRequest == null) {
                // There has never been a request with the specified message ID or
                // the response has already been received and handled. We can ignore
                // this abandon request.
                /*
                 * There has never been a request with the specified message ID
                 * or the response has already been received and handled. We can
                 * ignore this abandon request.
                 */
                // Message ID will be -1 since no request was sent.
                return new CompletedFutureResult<Void>((Void) null);
@@ -173,9 +165,6 @@
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> addAsync(final AddRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
@@ -208,9 +197,6 @@
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void addConnectionEventListener(final ConnectionEventListener listener) {
        Validator.ensureNotNull(listener);
@@ -236,9 +222,6 @@
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bindAsync(final BindRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
@@ -308,9 +291,6 @@
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close(final UnbindRequest request, final String reason) {
        // FIXME: I18N need to internationalize this message.
@@ -321,9 +301,6 @@
                        "Connection closed by client" + (reason != null ? ": " + reason : "")));
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compareAsync(final CompareRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
@@ -356,9 +333,6 @@
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> deleteAsync(final DeleteRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
@@ -391,9 +365,6 @@
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
            final ExtendedRequest<R> request,
@@ -447,9 +418,6 @@
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isClosed() {
        synchronized (stateLock) {
@@ -457,9 +425,6 @@
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isValid() {
        synchronized (stateLock) {
@@ -467,9 +432,6 @@
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyAsync(final ModifyRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
@@ -502,9 +464,6 @@
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
@@ -537,9 +496,6 @@
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void removeConnectionEventListener(final ConnectionEventListener listener) {
        Validator.ensureNotNull(listener);
@@ -550,9 +506,6 @@
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> searchAsync(final SearchRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
@@ -585,9 +538,6 @@
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
@@ -600,7 +550,7 @@
    }
    long cancelExpiredRequests(final long currentTime) {
        final long timeout = options.getTimeout(TimeUnit.MILLISECONDS);
        final long timeout = factory.getLDAPOptions().getTimeout(TimeUnit.MILLISECONDS);
        long delay = timeout;
        if (timeout > 0) {
            for (final int requestID : pendingRequests.keySet()) {
@@ -608,10 +558,9 @@
                if (future != null) {
                    final long diff = (future.getTimestamp() + timeout) - currentTime;
                    if (diff <= 0 && pendingRequests.remove(requestID) != null) {
                        StaticUtils.DEBUG_LOG.fine("Cancelling expired future result: " + future);
                        DEBUG_LOG.fine("Cancelling expired future result: " + future);
                        final Result result = Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT);
                        future.adaptErrorResult(result);
                        abandonAsync(Requests.newAbandonRequest(future.getRequestID()));
                    } else {
                        delay = Math.min(delay, diff);
@@ -691,7 +640,7 @@
                // Underlying channel prob blown up. Just ignore.
            }
        }
        TimeoutChecker.INSTANCE.removeConnection(this);
        factory.getTimeoutChecker().removeConnection(this);
        connection.closeSilently();
        // Notify listeners.
@@ -721,7 +670,7 @@
    }
    LDAPOptions getLDAPOptions() {
        return options;
        return factory.getLDAPOptions();
    }
    AbstractLDAPFutureResultImpl<?> getPendingRequest(final Integer messageID) {
@@ -837,12 +786,14 @@
    private void checkConnectionIsValid() throws ErrorResultException {
        if (!isValid0()) {
            if (failedDueToDisconnect) {
                // Connection termination was triggered remotely. We don't want
                // to blindly pass on the result code to requests since it could
                // be confused for a genuine response. For example, if the
                // disconnect contained the invalidCredentials result code then
                // this could be misinterpreted as a genuine authentication
                // failure for subsequent bind requests.
                /*
                 * Connection termination was triggered remotely. We don't want
                 * to blindly pass on the result code to requests since it could
                 * be confused for a genuine response. For example, if the
                 * disconnect contained the invalidCredentials result code then
                 * this could be misinterpreted as a genuine authentication
                 * failure for subsequent bind requests.
                 */
                throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN,
                        "Connection closed by server");
            } else {
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -27,11 +27,14 @@
package com.forgerock.opendj.ldap;
import static com.forgerock.opendj.ldap.DefaultTCPNIOTransport.DEFAULT_TRANSPORT;
import static com.forgerock.opendj.ldap.TimeoutChecker.TIMEOUT_CHECKER;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine;
@@ -55,6 +58,7 @@
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.ReferenceCountedObject;
/**
 * LDAP connection factory implementation.
@@ -154,10 +158,14 @@
        }
        private LDAPConnection adaptConnection(final org.glassfish.grizzly.Connection<?> connection) {
            // Test shows that its much faster with non block writes but risk
            // running out of memory if the server is slow.
            /*
             * Test shows that its much faster with non block writes but risk
             * running out of memory if the server is slow.
             */
            connection.configureBlocking(true);
            final LDAPConnection ldapConnection = new LDAPConnection(connection, options);
            final LDAPConnection ldapConnection =
                    new LDAPConnection(connection, LDAPConnectionFactoryImpl.this);
            timeoutChecker.get().addConnection(ldapConnection);
            clientFilter.registerConnection(connection, ldapConnection);
            return ldapConnection;
        }
@@ -194,7 +202,10 @@
    private final FilterChain defaultFilterChain;
    private final LDAPOptions options;
    private final SocketAddress socketAddress;
    private final TCPNIOTransport transport;
    private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final ReferenceCountedObject<TimeoutChecker>.Reference timeoutChecker = TIMEOUT_CHECKER
            .acquire();
    /**
     * Creates a new LDAP connection factory implementation which can be used to
@@ -207,11 +218,7 @@
     *            The LDAP connection options to use when creating connections.
     */
    public LDAPConnectionFactoryImpl(final SocketAddress address, final LDAPOptions options) {
        if (options.getTCPNIOTransport() == null) {
            this.transport = DefaultTCPNIOTransport.getInstance();
        } else {
            this.transport = options.getTCPNIOTransport();
        }
        this.transport = DEFAULT_TRANSPORT.acquireIfNull(options.getTCPNIOTransport());
        this.socketAddress = address;
        this.options = new LDAPOptions(options);
        this.clientFilter =
@@ -220,9 +227,14 @@
                FilterChainBuilder.stateless().add(new TransportFilter()).add(clientFilter).build();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        if (isClosed.compareAndSet(false, true)) {
            transport.release();
            timeoutChecker.release();
        }
    }
    @Override
    public Connection getConnection() throws ErrorResultException {
        try {
@@ -232,14 +244,12 @@
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
        final SocketConnectorHandler connectorHandler =
                TCPNIOConnectorHandler.builder(transport).processor(defaultFilterChain).build();
                TCPNIOConnectorHandler.builder(transport.get()).processor(defaultFilterChain)
                        .build();
        final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future =
                new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(handler);
        final CompletionHandlerAdapter cha = new CompletionHandlerAdapter(future);
@@ -256,9 +266,14 @@
        return socketAddress;
    }
    /**
     * {@inheritDoc}
     */
    TimeoutChecker getTimeoutChecker() {
        return timeoutChecker.get();
    }
    LDAPOptions getLDAPOptions() {
        return options;
    }
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPListenerImpl.java
@@ -22,14 +22,18 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package com.forgerock.opendj.ldap;
import static com.forgerock.opendj.ldap.DefaultTCPNIOTransport.DEFAULT_TRANSPORT;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.forgerock.opendj.ldap.DecodeOptions;
@@ -43,16 +47,17 @@
import org.glassfish.grizzly.nio.transport.TCPNIOServerConnection;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import com.forgerock.opendj.util.StaticUtils;
import com.forgerock.opendj.util.ReferenceCountedObject;
/**
 * LDAP listener implementation.
 */
public final class LDAPListenerImpl implements Closeable {
    private final TCPNIOTransport transport;
    private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
    private final FilterChain defaultFilterChain;
    private final ServerConnectionFactory<LDAPClientContext, Integer> connectionFactory;
    private final TCPNIOServerConnection serverConnection;
    private final AtomicBoolean isClosed = new AtomicBoolean();
    /**
     * Creates a new LDAP listener implementation which will listen for LDAP
@@ -72,11 +77,7 @@
    public LDAPListenerImpl(final SocketAddress address,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
            final LDAPListenerOptions options) throws IOException {
        if (options.getTCPNIOTransport() == null) {
            this.transport = DefaultTCPNIOTransport.getInstance();
        } else {
            this.transport = options.getTCPNIOTransport();
        }
        this.transport = DEFAULT_TRANSPORT.acquireIfNull(options.getTCPNIOTransport());
        this.connectionFactory = factory;
        final DecodeOptions decodeOptions = new DecodeOptions(options.getDecodeOptions());
@@ -85,26 +86,22 @@
                        new LDAPServerFilter(this, new LDAPReader(decodeOptions), options
                                .getMaxRequestSize())).build();
        final TCPNIOBindingHandler bindingHandler =
                TCPNIOBindingHandler.builder(transport).processor(defaultFilterChain).build();
                TCPNIOBindingHandler.builder(transport.get()).processor(defaultFilterChain).build();
        this.serverConnection = bindingHandler.bind(address, options.getBacklog());
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        try {
            serverConnection.close().get();
        } catch (final InterruptedException e) {
            // Cannot handle here.
            Thread.currentThread().interrupt();
        } catch (final Exception e) {
            // Ignore the exception.
            if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) {
                StaticUtils.DEBUG_LOG.log(Level.WARNING,
                        "Exception occurred while closing listener:" + e.getMessage(), e);
        if (isClosed.compareAndSet(false, true)) {
            try {
                serverConnection.close().get();
            } catch (final InterruptedException e) {
                // Cannot handle here.
                Thread.currentThread().interrupt();
            } catch (final Exception e) {
                DEBUG_LOG.log(Level.WARNING, "Exception occurred while closing listener", e);
            }
            transport.release();
        }
    }
@@ -117,9 +114,7 @@
        return serverConnection.getLocalAddress();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("LDAPListener(");
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
@@ -22,47 +22,64 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package com.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.glassfish.grizzly.utils.LinkedTransferQueue;
import com.forgerock.opendj.util.StaticUtils;
import com.forgerock.opendj.util.ReferenceCountedObject;
/**
 * Checks connection for pending requests that have timed out.
 */
final class TimeoutChecker {
    static final TimeoutChecker INSTANCE = new TimeoutChecker();
    static final ReferenceCountedObject<TimeoutChecker> TIMEOUT_CHECKER =
            new ReferenceCountedObject<TimeoutChecker>() {
                @Override
                protected void destroyInstance(final TimeoutChecker instance) {
                    instance.shutdown();
                }
    private final LinkedTransferQueue<LDAPConnection> connections;
    private transient final ReentrantLock lock;
    private transient final Condition available;
                @Override
                protected TimeoutChecker newInstance() {
                    return new TimeoutChecker();
                }
            };
    private final Condition available;
    private final List<LDAPConnection> connections;
    private final ReentrantLock lock;
    private boolean shutdownRequested = false;
    private TimeoutChecker() {
        this.connections = new LinkedTransferQueue<LDAPConnection>();
        this.connections = new LinkedList<LDAPConnection>();
        this.lock = new ReentrantLock();
        this.available = lock.newCondition();
        final Thread checkerThread = new Thread("Timeout Checker") {
        final Thread checkerThread = new Thread("OpenDJ LDAP SDK Connection Timeout Checker") {
            @Override
            public void run() {
                StaticUtils.DEBUG_LOG.fine("Timeout Checker Starting");
                final ReentrantLock lock = TimeoutChecker.this.lock;
                DEBUG_LOG.fine("Timeout Checker Starting");
                lock.lock();
                try {
                    while (true) {
                    while (!shutdownRequested) {
                        final long currentTime = System.currentTimeMillis();
                        long delay = 0;
                        for (final LDAPConnection connection : connections) {
                            StaticUtils.DEBUG_LOG.finer("Checking connection " + connection
                                    + " delay = " + delay);
                            if (DEBUG_LOG.isLoggable(Level.FINER)) {
                                DEBUG_LOG.finer("Checking connection " + connection + " delay = "
                                        + delay);
                            }
                            final long newDelay = connection.cancelExpiredRequests(currentTime);
                            if (newDelay > 0) {
                                if (delay > 0) {
@@ -75,15 +92,17 @@
                        try {
                            if (delay <= 0) {
                                StaticUtils.DEBUG_LOG.finer("There are no connections with "
                                DEBUG_LOG.finer("There are no connections with "
                                        + "timeout specified. Sleeping");
                                available.await();
                            } else {
                                StaticUtils.DEBUG_LOG.finer("Sleeping for " + delay + "ms");
                                if (DEBUG_LOG.isLoggable(Level.FINER)) {
                                    DEBUG_LOG.log(Level.FINER, "Sleeping for " + delay + " ms");
                                }
                                available.await(delay, TimeUnit.MILLISECONDS);
                            }
                        } catch (final InterruptedException e) {
                            // Just go around again.
                            shutdownRequested = true;
                        }
                    }
                } finally {
@@ -97,7 +116,6 @@
    }
    void addConnection(final LDAPConnection connection) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            connections.add(connection);
@@ -108,7 +126,6 @@
    }
    void removeConnection(final LDAPConnection connection) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            connections.remove(connection);
@@ -116,4 +133,14 @@
            lock.unlock();
        }
    }
    private void shutdown() {
        lock.lock();
        try {
            shutdownRequested = true;
            available.signalAll();
        } finally {
            lock.unlock();
        }
    }
}
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/ReferenceCountedObject.java
New file
@@ -0,0 +1,162 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2013 ForgeRock AS.
 */
package com.forgerock.opendj.util;
/**
 * An object which is lazily created when first referenced, and destroyed when
 * the last reference is released.
 *
 * @param <T>
 *            The type of referenced object.
 */
public abstract class ReferenceCountedObject<T> {
    /**
     * A reference to the reference counted object which will automatically be
     * released during garbage collection.
     */
    public final class Reference {
        private T value;
        private Reference(final T value) {
            this.value = value;
        }
        /**
         * Returns the referenced object.
         *
         * @return The referenced object.
         * @throws NullPointerException
         *             If the referenced object has already been released.
         */
        public T get() {
            if (value == null) {
                throw new NullPointerException(); // Fail-fast.
            }
            return value;
        }
        /**
         * Decrements the reference count for the reference counted object if
         * this reference refers to the reference counted instance. If the
         * reference count drops to zero then the referenced object will be
         * destroyed.
         */
        public void release() {
            releaseIfSame(value);
            /*
             * Force NPE for subsequent get() attempts and prevent multiple
             * releases.
             */
            value = null;
        }
        /**
         * Provide a finalizer because reference counting is intended for
         * expensive rarely created resources which should not be accidentally
         * left around.
         */
        @Override
        protected void finalize() {
            release();
        }
    }
    private T instance = null;
    private final Object lock = new Object();
    private int refCount = 0;
    /**
     * Creates a new referenced object whose reference count is initially zero.
     */
    protected ReferenceCountedObject() {
        // Nothing to do.
    }
    /**
     * Returns a reference to the reference counted object.
     *
     * @return A reference to the reference counted object.
     */
    public final Reference acquire() {
        synchronized (lock) {
            if (refCount++ == 0) {
                assert instance == null;
                instance = newInstance();
            }
            return new Reference(instance);
        }
    }
    /**
     * Returns a reference to the provided object or, if it is {@code null}, a
     * reference to the reference counted object.
     *
     * @param value
     *            The object to be referenced, or {@code null} if the reference
     *            counted object should be used.
     * @return A reference to the provided object or, if it is {@code null}, a
     *         reference to the reference counted object.
     */
    public final Reference acquireIfNull(final T value) {
        return value != null ? new Reference(value) : acquire();
    }
    /**
     * Invoked when a reference is released and the reference count will become
     * zero. Implementations should release any resources associated with the
     * resource and should not return until the resources have been released.
     *
     * @param instance
     *            The instance to be destroyed.
     */
    protected abstract void destroyInstance(T instance);
    /**
     * Invoked when a reference is acquired and the current reference count is
     * zero. Implementations should create a new instance as fast as possible.
     *
     * @return The new instance.
     */
    protected abstract T newInstance();
    private final void releaseIfSame(final T instance) {
        T instanceToRelease = null;
        synchronized (lock) {
            if (this.instance == instance) {
                if (--refCount == 0) {
                    instanceToRelease = instance;
                    this.instance = null;
                }
            }
        }
        if (instanceToRelease != null) {
            destroyInstance(instanceToRelease);
        }
    }
}
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/StaticUtils.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package com.forgerock.opendj.util;
@@ -48,6 +48,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -86,9 +87,30 @@
    // UTC TimeZone is assumed to never change over JVM lifetime
    private static final TimeZone TIME_ZONE_UTC_OBJ = TimeZone.getTimeZone(TIME_ZONE_UTC);
    private static ScheduledExecutorService defaultScheduler = null;
    /**
     * The default scheduler which should be used when the application does not
     * provide one.
     */
    public static final ReferenceCountedObject<ScheduledExecutorService> DEFAULT_SCHEDULER =
            new ReferenceCountedObject<ScheduledExecutorService>() {
    private static final Object DEFAULT_SCHEDULER_LOCK = new Object();
                @Override
                protected ScheduledExecutorService newInstance() {
                    final ThreadFactory factory =
                            newThreadFactory(null, "OpenDJ LDAP SDK Default Scheduler", true);
                    return Executors.newSingleThreadScheduledExecutor(factory);
                }
                @Override
                protected void destroyInstance(ScheduledExecutorService instance) {
                    instance.shutdown();
                    try {
                        instance.awaitTermination(5, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            };
    /**
     * Retrieves a string representation of the provided byte in hexadecimal.
@@ -1395,22 +1417,6 @@
    }
    /**
     * Returns the default scheduler which should be used by the SDK.
     *
     * @return The default scheduler.
     */
    public static ScheduledExecutorService getDefaultScheduler() {
        synchronized (DEFAULT_SCHEDULER_LOCK) {
            if (defaultScheduler == null) {
                final ThreadFactory factory =
                        newThreadFactory(null, "OpenDJ SDK Default Scheduler", true);
                defaultScheduler = Executors.newSingleThreadScheduledExecutor(factory);
            }
        }
        return defaultScheduler;
    }
    /**
     * Retrieves the best human-readable message for the provided exception. For
     * exceptions defined in the OpenDJ project, it will attempt to use the
     * message (combining it with the message ID if available). For some
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractConnectionWrapper.java
@@ -55,12 +55,15 @@
 * An abstract base class from which connection wrappers may be easily
 * implemented. The default implementation of each method is to delegate to the
 * wrapped connection.
 *
 * @param <C>
 *            The type of wrapped connection.
 */
public abstract class AbstractConnectionWrapper implements Connection {
public abstract class AbstractConnectionWrapper<C extends Connection> implements Connection {
    /**
     * The wrapped connection.
     */
    protected final Connection connection;
    protected final C connection;
    /**
     * Creates a new connection wrapper.
@@ -68,7 +71,7 @@
     * @param connection
     *            The connection to be wrapped.
     */
    protected AbstractConnectionWrapper(final Connection connection) {
    protected AbstractConnectionWrapper(final C connection) {
        Validator.ensureNotNull(connection);
        this.connection = connection;
    }
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -22,11 +22,13 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.util.ArrayList;
@@ -39,7 +41,7 @@
import java.util.logging.Level;
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.StaticUtils;
import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
@@ -55,11 +57,8 @@
            ResultHandler<Connection> {
        private final ConnectionFactory factory;
        private final AtomicBoolean isOperational = new AtomicBoolean(true);
        private volatile FutureResult<?> pendingConnectFuture = null;
        private final int index;
        private MonitoredConnectionFactory(final ConnectionFactory factory, final int index) {
@@ -67,9 +66,12 @@
            this.index = index;
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void close() {
            // Should we cancel the future?
            factory.close();
        }
        @Override
        public Connection getConnection() throws ErrorResultException {
            final Connection connection;
@@ -87,14 +89,12 @@
            return connection;
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public FutureResult<Connection> getConnectionAsync(
                final ResultHandler<? super Connection> resultHandler) {
            final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future =
                   new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(resultHandler);
                    new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
                            resultHandler);
            final ResultHandler<Connection> failoverHandler = new ResultHandler<Connection>() {
                @Override
@@ -141,9 +141,6 @@
            connection.close();
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public String toString() {
            return factory.toString();
@@ -156,9 +153,9 @@
        private synchronized void checkIfAvailable() {
            if (!isOperational.get()
                    && (pendingConnectFuture == null || pendingConnectFuture.isDone())) {
                if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
                    StaticUtils.DEBUG_LOG.fine(String
                            .format("Attempting reconnect to offline factory " + this));
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format("Attempting reconnect to offline factory '%s'",
                            this));
                }
                pendingConnectFuture = factory.getConnectionAsync(this);
            }
@@ -167,21 +164,22 @@
        private void notifyOffline(final ErrorResultException error) {
            if (isOperational.getAndSet(false)) {
                // Transition from online to offline.
                if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) {
                    StaticUtils.DEBUG_LOG.warning(String.format("Connection factory " + factory
                            + " is no longer operational: " + error.getMessage()));
                if (DEBUG_LOG.isLoggable(Level.WARNING)) {
                    DEBUG_LOG.warning(String.format(
                            "Connection factory '%s' is no longer operational: %s", factory, error
                                    .getMessage()));
                }
                synchronized (stateLock) {
                    offlineFactoriesCount++;
                    if (offlineFactoriesCount == 1) {
                        // Enable monitoring.
                        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
                            StaticUtils.DEBUG_LOG.fine(String.format("Starting monitoring thread"));
                        if (DEBUG_LOG.isLoggable(Level.FINE)) {
                            DEBUG_LOG.fine(String.format("Starting monitoring thread"));
                        }
                        monitoringFuture =
                                scheduler.scheduleWithFixedDelay(new MonitorRunnable(), 0,
                                scheduler.get().scheduleWithFixedDelay(new MonitorRunnable(), 0,
                                        monitoringInterval, monitoringIntervalTimeUnit);
                    }
                }
@@ -191,16 +189,16 @@
        private void notifyOnline() {
            if (!isOperational.getAndSet(true)) {
                // Transition from offline to online.
                if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO)) {
                    StaticUtils.DEBUG_LOG.info(String.format("Connection factory " + factory
                            + " is now operational"));
                if (DEBUG_LOG.isLoggable(Level.INFO)) {
                    DEBUG_LOG.info(String.format("Connection factory'%s' is now operational",
                            factory));
                }
                synchronized (stateLock) {
                    offlineFactoriesCount--;
                    if (offlineFactoriesCount == 0) {
                        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
                            StaticUtils.DEBUG_LOG.fine(String.format("Stopping monitoring thread"));
                        if (DEBUG_LOG.isLoggable(Level.FINE)) {
                            DEBUG_LOG.fine(String.format("Stopping monitoring thread"));
                        }
                        monitoringFuture.cancel(false);
@@ -225,91 +223,65 @@
    }
    private final List<MonitoredConnectionFactory> monitoredFactories;
    private final ScheduledExecutorService scheduler;
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    private final Object stateLock = new Object();
    // Guarded by stateLock.
    private int offlineFactoriesCount = 0;
    private final long monitoringInterval;
    private final TimeUnit monitoringIntervalTimeUnit;
    // Guarded by stateLock.
    private ScheduledFuture<?> monitoringFuture;
    /**
     * Creates a new abstract load balancing algorithm which will monitor
     * offline connection factories every second using the default scheduler.
     *
     * @param factories
     *            The connection factories.
     * Guarded by stateLock.
     */
    private int offlineFactoriesCount = 0;
    private final long monitoringInterval;
    private final TimeUnit monitoringIntervalTimeUnit;
    /**
     * Guarded by stateLock.
     */
    private ScheduledFuture<?> monitoringFuture;
    private AtomicBoolean isClosed = new AtomicBoolean();
    AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) {
        this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
        this(factories, 1, TimeUnit.SECONDS, null);
    }
    /**
     * Creates a new abstract load balancing algorithm which will monitor
     * offline connection factories using the specified frequency using the
     * default scheduler.
     *
     * @param factories
     *            The connection factories.
     * @param interval
     *            The interval between attempts to poll offline factories.
     * @param unit
     *            The time unit for the interval between attempts to poll
     *            offline factories.
     */
    AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
            final long interval, final TimeUnit unit) {
        this(factories, interval, unit, StaticUtils.getDefaultScheduler());
        this(factories, interval, unit, null);
    }
    /**
     * Creates a new abstract load balancing algorithm which will monitor
     * offline connection factories using the specified frequency and scheduler.
     *
     * @param factories
     *            The connection factories.
     * @param interval
     *            The interval between attempts to poll offline factories.
     * @param unit
     *            The time unit for the interval between attempts to poll
     *            offline factories.
     * @param scheduler
     *            The scheduler which should for periodically monitoring dead
     *            connection factories to see if they are usable again.
     */
    AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
            final long interval, final TimeUnit unit, final ScheduledExecutorService scheduler) {
        Validator.ensureNotNull(factories, scheduler, unit);
        Validator.ensureNotNull(factories, unit);
        this.monitoredFactories = new ArrayList<MonitoredConnectionFactory>(factories.size());
        int i = 0;
        for (final ConnectionFactory f : factories) {
            this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++));
        }
        this.scheduler = scheduler;
        this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
        this.monitoringInterval = interval;
        this.monitoringIntervalTimeUnit = unit;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        if (isClosed.compareAndSet(false, true)) {
            synchronized (stateLock) {
                if (monitoringFuture != null) {
                    monitoringFuture.cancel(false);
                    monitoringFuture = null;
                }
            }
            for (ConnectionFactory factory : monitoredFactories) {
                factory.close();
            }
            scheduler.release();
        }
    }
    @Override
    public final ConnectionFactory getConnectionFactory() throws ErrorResultException {
        final int index = getInitialConnectionFactoryIndex();
        return getMonitoredConnectionFactory(index);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
@@ -357,9 +329,11 @@
            index = (index + 1) % maxIndex;
        } while (index != initialIndex);
        // All factories are offline so give up. We could have a
        // configurable policy here such as waiting indefinitely, or for a
        // configurable timeout period.
        /*
         * All factories are offline so give up. We could have a configurable
         * policy here such as waiting indefinitely, or for a configurable
         * timeout period.
         */
        throw newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR,
                "No operational connection factories available");
    }
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AuthenticatedConnectionFactory.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -51,43 +51,34 @@
     * An authenticated connection supports all operations except Bind
     * operations.
     */
    public static final class AuthenticatedConnection extends AbstractConnectionWrapper {
    public static final class AuthenticatedConnection extends AbstractConnectionWrapper<Connection> {
        private AuthenticatedConnection(final Connection connection) {
            super(connection);
        }
        /**
        /*
         * Bind operations are not supported by pre-authenticated connections.
         * This method will always throw {@code UnsupportedOperationException}.
         * These methods will always throw {@code UnsupportedOperationException}.
         */
        /**
         * {@inheritDoc}
         */
        public FutureResult<BindResult> bindAsync(final BindRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super BindResult> resultHandler) {
            throw new UnsupportedOperationException();
        }
        /**
         * {@inheritDoc}
         */
        public BindResult bind(BindRequest request) throws ErrorResultException {
            throw new UnsupportedOperationException();
        }
        /**
         * {@inheritDoc}
         */
        public BindResult bind(String name, char[] password) throws ErrorResultException {
            throw new UnsupportedOperationException();
        }
        /**
         * {@inheritDoc}
         */
        public String toString() {
            StringBuilder builder = new StringBuilder();
            builder.append("AuthenticatedConnection(");
@@ -100,11 +91,8 @@
    private static final class FutureResultImpl {
        private final FutureResultTransformer<BindResult, Connection> futureBindResult;
        private final RecursiveFutureResult<Connection, BindResult> futureConnectionResult;
        private final BindRequest bindRequest;
        private Connection connection;
        private FutureResultImpl(final BindRequest request,
@@ -148,7 +136,6 @@
    }
    private final BindRequest request;
    private final ConnectionFactory parentFactory;
    /**
@@ -169,9 +156,12 @@
        this.request = request;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        // Delegate.
        parentFactory.close();
    }
    public Connection getConnection() throws ErrorResultException {
        final Connection connection = parentFactory.getConnection();
        boolean bindSucceeded = false;
@@ -183,14 +173,15 @@
                connection.close();
            }
        }
        // If the bind didn't succeed then an exception will have been thrown
        // and this line will not be reached.
        /*
         * If the bind didn't succeed then an exception will have been thrown
         * and this line will not be reached.
         */
        return new AuthenticatedConnection(connection);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
@@ -200,9 +191,7 @@
        return future.futureBindResult;
    }
    /**
     * {@inheritDoc}
     */
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("AuthenticatedConnectionFactory(");
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connection.java
@@ -439,7 +439,10 @@
     *
     * Calling {@code close} on a connection that is already closed has no
     * effect.
     *
     * @see Connections#uncloseable(Connection)
     */
    @Override
    void close();
    /**
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionFactory.java
@@ -22,11 +22,13 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import java.io.Closeable;
/**
 * A connection factory provides an interface for obtaining a connection to a
 * Directory Server. Connection factories can be used to wrap other connection
@@ -49,7 +51,28 @@
 * should aim to close connections as soon as possible in order to avoid
 * resource contention.
 */
public interface ConnectionFactory {
public interface ConnectionFactory extends Closeable {
    /**
     * Releases any resources associated with this connection factory. Depending
     * on the implementation a factory may:
     * <ul>
     * <li>do nothing
     * <li>close underlying connection factories (e.g. load-balancers)
     * <li>close pooled connections (e.g. connection pools)
     * <li>shutdown IO event service and related thread pools (e.g. Grizzly).
     * </ul>
     * Calling {@code close} on a connection factory which is already closed has
     * no effect.
     * <p>
     * Applications should avoid closing connection factories while there are
     * remaining active connections in use or connection attempts in progress.
     *
     * @see Connections#uncloseable(ConnectionFactory)
     */
    @Override
    public void close();
    /**
     * Asynchronously obtains a connection to the Directory Server associated
     * with this connection factory. The returned {@code FutureResult} can be
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
@@ -21,13 +21,11 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2011-2012 ForgeRock AS
 *      Copyright 2011-2013 ForgeRock AS
 */
package org.forgerock.opendj.ldap;
import java.io.Closeable;
/**
 * A connection factory which maintains and re-uses a pool of connections.
 * Connections obtained from a connection pool are returned to the connection
@@ -41,7 +39,7 @@
 * Since pooled connections are re-used, applications must use operations such
 * as binds and StartTLS with extreme caution.
 */
public interface ConnectionPool extends ConnectionFactory, Closeable {
public interface ConnectionPool extends ConnectionFactory {
    /**
     * Releases any resources associated with this connection pool. Pooled
     * connections will be permanently closed and this connection pool will no
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -386,6 +386,11 @@
        return new ConnectionFactory() {
            @Override
            public void close() {
                factory.close();
            }
            @Override
            public Connection getConnection() throws ErrorResultException {
                return factory.getConnection();
            }
@@ -396,9 +401,6 @@
                return factory.getConnectionAsync(handler);
            }
            /**
             * {@inheritDoc}
             */
            @Override
            public String toString() {
                return name;
@@ -483,7 +485,7 @@
     * @return An uncloseable view of the provided connection.
     */
    public static Connection uncloseable(Connection connection) {
        return new AbstractConnectionWrapper(connection) {
        return new AbstractConnectionWrapper<Connection>(connection) {
            @Override
            public void close() {
                // Do nothing.
@@ -496,6 +498,36 @@
        };
    }
    /**
     * Returns an uncloseable view of the provided connection factory. Attempts
     * to call {@link ConnectionFactory#close()} will be ignored.
     *
     * @param factory
     *            The connection factory whose {@code close} method is to be
     *            disabled.
     * @return An uncloseable view of the provided connection factory.
     */
    public static ConnectionFactory uncloseable(final ConnectionFactory factory) {
        return new ConnectionFactory() {
            @Override
            public FutureResult<Connection> getConnectionAsync(
                    ResultHandler<? super Connection> handler) {
                return factory.getConnectionAsync(handler);
            }
            @Override
            public Connection getConnection() throws ErrorResultException {
                return factory.getConnection();
            }
            @Override
            public void close() {
                // Do nothing.
            }
        };
    }
    // Prevent instantiation.
    private Connections() {
        // Do nothing.
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -75,18 +75,17 @@
     * the pool completes.
     */
    private final class ConnectionResultHandler implements ResultHandler<Connection> {
        /**
         * {@inheritDoc}
         */
        @Override
        public void handleErrorResult(final ErrorResultException error) {
            // Connection attempt failed, so decrease the pool size.
            currentPoolSize.release();
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format("Connection attempt failed: " + error.getMessage()
                        + " currentPoolSize=%d, poolSize=%d", poolSize
                        - currentPoolSize.availablePermits(), poolSize));
                DEBUG_LOG.fine(String.format(
                        "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error
                                .getMessage(), poolSize - currentPoolSize.availablePermits(),
                        poolSize));
            }
            QueueElement holder;
@@ -103,17 +102,13 @@
            holder.getWaitingFuture().handleErrorResult(error);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void handleResult(final Connection connection) {
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format("Connection attempt succeeded: "
                        + " currentPoolSize=%d, poolSize=%d", poolSize
                        - currentPoolSize.availablePermits(), poolSize));
                DEBUG_LOG.fine(String.format(
                        "Connection attempt succeeded:  currentPoolSize=%d, poolSize=%d", poolSize
                                - currentPoolSize.availablePermits(), poolSize));
            }
            publishConnection(connection);
        }
    }
@@ -173,10 +168,12 @@
                notifyErrorOccurred = error != null;
                if (!notifyClose) {
                    if (listeners == null) {
                        // Create and register first listener. If an error has
                        // already occurred on the underlying connection, then
                        // the listener may be immediately invoked so ensure
                        // that it is already in the list.
                        /*
                         * Create and register first listener. If an error has
                         * already occurred on the underlying connection, then
                         * the listener may be immediately invoked so ensure
                         * that it is already in the list.
                         */
                        listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
                        listeners.add(listener);
                        connection.addConnectionEventListener(this);
@@ -235,8 +232,10 @@
                tmpListeners = listeners;
            }
            // Remove underlying listener if needed and do this before
            // subsequent connection events may occur.
            /*
             * Remove underlying listener if needed and do this before
             * subsequent connection events may occur.
             */
            if (tmpListeners != null) {
                connection.removeConnectionEventListener(this);
            }
@@ -245,18 +244,20 @@
            if (connection.isValid()) {
                publishConnection(connection);
            } else {
                // The connection may have been disconnected by the remote
                // server, but the server may still be available. In order to
                // avoid leaving pending futures hanging indefinitely, we should
                // try to reconnect immediately. No need to release/acquire
                // currentPoolSize.
                /*
                 * The connection may have been disconnected by the remote
                 * server, but the server may still be available. In order to
                 * avoid leaving pending futures hanging indefinitely, we should
                 * try to reconnect immediately. No need to release/acquire
                 * currentPoolSize.
                 */
                connection.close();
                factory.getConnectionAsync(connectionResultHandler);
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format("Connection no longer valid. "
                            + "currentPoolSize=%d, poolSize=%d", poolSize
                            - currentPoolSize.availablePermits(), poolSize));
                    DEBUG_LOG.fine(String.format(
                            "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize
                                    - currentPoolSize.availablePermits(), poolSize));
                }
            }
@@ -542,7 +543,9 @@
        }
        QueueElement(final ResultHandler<? super Connection> handler) {
            this.value = new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(handler);
            this.value =
                    new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
                            handler);
        }
        @Override
@@ -575,24 +578,12 @@
    private final int poolSize;
    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
    /**
     * Creates a new connection pool which will maintain {@code poolSize}
     * connections created using the provided connection factory.
     *
     * @param factory
     *            The connection factory to use for creating new connections.
     * @param poolSize
     *            The maximum size of the connection pool.
     */
    FixedConnectionPool(final ConnectionFactory factory, final int poolSize) {
        this.factory = factory;
        this.poolSize = poolSize;
        this.currentPoolSize = new Semaphore(poolSize);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        final LinkedList<Connection> idleConnections;
@@ -602,8 +593,10 @@
            }
            isClosed = true;
            // Remove any connections which are waiting in the queue as these
            // can be closed immediately.
            /*
             * Remove any connections which are waiting in the queue as these
             * can be closed immediately.
             */
            idleConnections = new LinkedList<Connection>();
            while (hasWaitingConnections()) {
                final QueueElement holder = queue.removeFirst();
@@ -621,11 +614,11 @@
        for (final Connection connection : idleConnections) {
            closeConnection(connection);
        }
        // Close the underlying factory.
        factory.close();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public Connection getConnection() throws ErrorResultException {
        try {
@@ -635,9 +628,6 @@
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
@@ -672,9 +662,9 @@
                    currentPoolSize.release();
                    if (DEBUG_LOG.isLoggable(Level.FINE)) {
                        DEBUG_LOG.fine(String.format("Connection no longer valid. "
                                + "currentPoolSize=%d, poolSize=%d", poolSize
                                - currentPoolSize.availablePermits(), poolSize));
                        DEBUG_LOG.fine(String.format(
                                "Connection no longer valid: currentPoolSize=%d, poolSize=%d",
                                poolSize - currentPoolSize.availablePermits(), poolSize));
                    }
                }
            } else {
@@ -688,9 +678,6 @@
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
@@ -719,7 +706,7 @@
        if (DEBUG_LOG.isLoggable(Level.FINE)) {
            DEBUG_LOG.fine(String.format("Closing connection because connection pool is closing: "
                    + " currentPoolSize=%d, poolSize=%d", poolSize
                    + "currentPoolSize=%d, poolSize=%d", poolSize
                    - currentPoolSize.availablePermits(), poolSize));
        }
    }
@@ -763,14 +750,14 @@
                holder.getWaitingFuture().handleErrorResult(e);
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format("Connection attempt failed: " + e.getMessage()
                            + " currentPoolSize=%d, poolSize=%d", poolSize
                            - currentPoolSize.availablePermits(), poolSize));
                    DEBUG_LOG.fine(String.format(
                            "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e
                                    .getMessage(), poolSize - currentPoolSize.availablePermits(),
                            poolSize));
                }
            }
        } else {
            final PooledConnection pooledConnection = new PooledConnection(connection);
            holder.getWaitingFuture().handleResult(pooledConnection);
            holder.getWaitingFuture().handleResult(new PooledConnection(connection));
        }
    }
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -40,6 +40,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Level;
@@ -64,7 +65,7 @@
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.FutureResultTransformer;
import com.forgerock.opendj.util.StaticUtils;
import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
@@ -75,7 +76,7 @@
    /**
     * A connection that sends heart beats and supports all operations.
     */
    private final class ConnectionImpl extends AbstractConnectionWrapper implements
    private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements
            ConnectionEventListener, SearchResultHandler {
        /**
@@ -85,9 +86,8 @@
         * @param <R>
         *            The type of result returned by the request.
         */
        private abstract class DelayedFuture<R extends Result>
                extends AsynchronousFutureResult<R, ResultHandler<? super R>>
                implements Runnable {
        private abstract class DelayedFuture<R extends Result> extends
                AsynchronousFutureResult<R, ResultHandler<? super R>> implements Runnable {
            private volatile FutureResult<R> innerFuture = null;
            protected DelayedFuture(final ResultHandler<? super R> handler) {
@@ -123,14 +123,19 @@
        }
        // List of pending Bind or StartTLS requests which must be invoked
        // when the current heart beat completes.
        /*
         * List of pending Bind or StartTLS requests which must be invoked when
         * the current heart beat completes.
         */
        private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>();
        // Coordinates heart-beats with Bind and StartTLS requests.
        /* Coordinates heart-beats with Bind and StartTLS requests. */
        private final Sync sync = new Sync();
        // Timestamp of last response received (any response, not just heart beats).
        /*
         * Timestamp of last response received (any response, not just heart
         * beats).
         */
        private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
        private ConnectionImpl(final Connection connection) {
@@ -206,8 +211,10 @@
                return connection.bindAsync(request, intermediateResponseHandler, timestamper(
                        resultHandler, true));
            } else {
                // A heart beat must be in progress so create a runnable task
                // which will be executed when the heart beat completes.
                /*
                 * A heart beat must be in progress so create a runnable task
                 * which will be executed when the heart beat completes.
                 */
                final DelayedFuture<BindResult> future =
                        new DelayedFuture<BindResult>(resultHandler) {
                            @Override
@@ -216,7 +223,10 @@
                                        timestamper(this, true));
                            }
                        };
                // Enqueue and flush if the heart beat has completed in the mean time.
                /*
                 * Enqueue and flush if the heart beat has completed in the mean
                 * time.
                 */
                pendingRequests.offer(future);
                flushPendingRequests();
                return future;
@@ -342,8 +352,11 @@
                    return connection.extendedRequestAsync(request, intermediateResponseHandler,
                            timestamper(resultHandler, true));
                } else {
                    // A heart beat must be in progress so create a runnable task
                    // which will be executed when the heart beat completes.
                    /*
                     * A heart beat must be in progress so create a runnable
                     * task which will be executed when the heart beat
                     * completes.
                     */
                    final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
                        @Override
                        public FutureResult<R> dispatch() {
@@ -351,7 +364,11 @@
                                    intermediateResponseHandler, timestamper(this, true));
                        }
                    };
                    // Enqueue and flush if the heart beat has completed in the mean time.
                    /*
                     * Enqueue and flush if the heart beat has completed in the
                     * mean time.
                     */
                    pendingRequests.offer(future);
                    flushPendingRequests();
                    return future;
@@ -382,7 +399,7 @@
        @Override
        public void handleErrorResult(final ErrorResultException error) {
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format("Heartbeat failed: " + error.getMessage()));
                DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage()));
            }
            updateTimestamp();
            releaseHeartBeatLock();
@@ -582,8 +599,10 @@
        }
        private void acquireBindOrStartTLSLock() throws ErrorResultException {
            // Wait for pending heartbeats and prevent new heartbeats from
            // being sent while the bind is in progress.
            /*
             * Wait for pending heartbeats and prevent new heartbeats from being
             * sent while the bind is in progress.
             */
            try {
                if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
                    // Give up - it looks like the connection is dead.
@@ -597,8 +616,11 @@
        private void flushPendingRequests() {
            if (!pendingRequests.isEmpty()) {
                // The pending requests will acquire the shared lock, but we take
                // it here anyway to ensure that pending requests do not get blocked.
                /*
                 * The pending requests will acquire the shared lock, but we
                 * take it here anyway to ensure that pending requests do not
                 * get blocked.
                 */
                if (sync.tryLockShared()) {
                    try {
                        Runnable pendingRequest;
@@ -617,7 +639,10 @@
                connection.removeConnectionEventListener(this);
                activeConnections.remove(this);
                if (activeConnections.isEmpty()) {
                    // This is the last active connection, so stop the heartbeat.
                    /*
                     * This is the last active connection, so stop the
                     * heartbeat.
                     */
                    heartBeatFuture.cancel(false);
                }
            }
@@ -633,22 +658,33 @@
        }
        private void sendHeartBeat() {
            // Only send the heartbeat if the connection has been idle for some time.
            /*
             * Only send the heartbeat if the connection has been idle for some
             * time.
             */
            if (currentTimeMillis() < (timestamp + minDelayMS)) {
                return;
            }
            // Don't send a heart beat if there is already a heart beat,
            // bind, or startTLS in progress. Note that the bind/startTLS
            // response will update the timestamp as if it were a heart beat.
            /*
             * Don't send a heart beat if there is already a heart beat, bind,
             * or startTLS in progress. Note that the bind/startTLS response
             * will update the timestamp as if it were a heart beat.
             */
            if (sync.tryLockExclusively()) {
                try {
                    connection.searchAsync(heartBeatRequest, null, this);
                } catch (final Exception e) {
                    // This may happen when we attempt to send the heart beat just
                    // after the connection is closed but before we are notified.
                    /*
                     * This may happen when we attempt to send the heart beat
                     * just after the connection is closed but before we are
                     * notified.
                     */
                    // Release the lock because we're never going to get a response.
                    /*
                     * Release the lock because we're never going to get a
                     * response.
                     */
                    releaseHeartBeatLock();
                }
            }
@@ -755,7 +791,7 @@
     * </ul>
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        // Lock states. Positive values indicate that the shared lock is taken.
        /* Lock states. Positive values indicate that the shared lock is taken. */
        private static final int UNLOCKED = 0; // initial state
        private static final int LOCKED_EXCLUSIVELY = -1;
@@ -809,8 +845,10 @@
                }
                final int newState = state - 1;
                if (compareAndSetState(state, newState)) {
                    // We could always return true here, but since there cannot
                    // be waiting readers we can specialize for waiting writers.
                    /*
                     * We could always return true here, but since there cannot
                     * be waiting readers we can specialize for waiting writers.
                     */
                    return newState == UNLOCKED;
                }
            }
@@ -851,83 +889,29 @@
    private final SearchRequest heartBeatRequest;
    private final long interval;
    private final long minDelayMS;
    private final ScheduledExecutorService scheduler;
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    private final long timeoutMS;
    private final TimeUnit unit;
    private AtomicBoolean isClosed = new AtomicBoolean();
    /**
     * Creates a new heart-beat connection factory which will create connections
     * using the provided connection factory and periodically ping any created
     * connections in order to detect that they are still alive every 10 seconds
     * using the default scheduler.
     *
     * @param factory
     *            The connection factory to use for creating connections.
     */
    HeartBeatConnectionFactory(final ConnectionFactory factory) {
        this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler());
        this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null);
    }
    /**
     * Creates a new heart-beat connection factory which will create connections
     * using the provided connection factory and periodically ping any created
     * connections in order to detect that they are still alive using the
     * specified frequency and the default scheduler.
     *
     * @param factory
     *            The connection factory to use for creating connections.
     * @param interval
     *            The interval between keepalive pings.
     * @param unit
     *            The time unit for the interval between keepalive pings.
     */
    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
            final TimeUnit unit) {
        this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler());
        this(factory, interval, unit, DEFAULT_SEARCH, null);
    }
    /**
     * Creates a new heart-beat connection factory which will create connections
     * using the provided connection factory and periodically ping any created
     * connections using the specified search request in order to detect that
     * they are still alive.
     *
     * @param factory
     *            The connection factory to use for creating connections.
     * @param interval
     *            The interval between keepalive pings.
     * @param unit
     *            The time unit for the interval between keepalive pings.
     * @param heartBeat
     *            The search request to use for keepalive pings.
     */
    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
            final TimeUnit unit, final SearchRequest heartBeat) {
        this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler());
        this(factory, interval, unit, heartBeat, null);
    }
    /**
     * Creates a new heart-beat connection factory which will create connections
     * using the provided connection factory and periodically ping any created
     * connections using the specified search request in order to detect that
     * they are still alive.
     *
     * @param factory
     *            The connection factory to use for creating connections.
     * @param interval
     *            The interval between keepalive pings.
     * @param unit
     *            The time unit for the interval between keepalive pings.
     * @param heartBeat
     *            The search request to use for keepalive pings.
     * @param scheduler
     *            The scheduler which should for periodically sending keepalive
     *            pings.
     */
    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
            final TimeUnit unit, final SearchRequest heartBeat,
            final ScheduledExecutorService scheduler) {
        Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
        Validator.ensureNotNull(factory, heartBeat, unit);
        Validator.ensureTrue(interval >= 0, "negative timeout");
        this.heartBeatRequest = heartBeat;
@@ -935,22 +919,34 @@
        this.unit = unit;
        this.activeConnections = new LinkedList<ConnectionImpl>();
        this.factory = factory;
        this.scheduler = scheduler;
        this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
        this.timeoutMS = unit.toMillis(interval) * 2;
        this.minDelayMS = unit.toMillis(interval) / 2;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        if (isClosed.compareAndSet(false, true)) {
            synchronized (activeConnections) {
                if (!activeConnections.isEmpty()) {
                    if (DEBUG_LOG.isLoggable(Level.FINE)) {
                        DEBUG_LOG.fine(String.format(
                                "HeartbeatConnectionFactory '%s' is closing while %d "
                                        + "active connections remain", toString(),
                                activeConnections.size()));
                    }
                }
            }
            scheduler.release();
            factory.close();
        }
    }
    @Override
    public Connection getConnection() throws ErrorResultException {
        return adaptConnection(factory.getConnection());
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
@@ -967,9 +963,6 @@
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
@@ -984,8 +977,8 @@
        synchronized (activeConnections) {
            connection.addConnectionEventListener(heartBeatConnection);
            if (activeConnections.isEmpty()) {
                // This is the first active connection, so start the heart beat.
                heartBeatFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
                /* This is the first active connection, so start the heart beat. */
                heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() {
                    @Override
                    public void run() {
                        final ConnectionImpl[] tmp;
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/InternalConnectionFactory.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -50,9 +50,7 @@
 *            The type of client context.
 */
final class InternalConnectionFactory<C> implements ConnectionFactory {
    private final ServerConnectionFactory<C, Integer> factory;
    private final C clientContext;
    InternalConnectionFactory(final ServerConnectionFactory<C, Integer> factory,
@@ -61,17 +59,16 @@
        this.clientContext = clientContext;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        // Nothing to do.
    }
    public Connection getConnection() throws ErrorResultException {
        final ServerConnection<Integer> serverConnection = factory.handleAccept(clientContext);
        return new InternalConnection(serverConnection);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
        final ServerConnection<Integer> serverConnection;
@@ -91,9 +88,6 @@
        return new CompletedFutureResult<Connection>(connection);
    }
    /**
     * {@inheritDoc}
     */
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("InternalConnectionFactory(");
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -39,9 +39,10 @@
 * Server.
 */
public final class LDAPConnectionFactory implements ConnectionFactory {
    // We implement the factory using the pimpl idiom in order to avoid making
    // too many implementation classes public.
    /*
     * We implement the factory using the pimpl idiom in order to avoid making
     * too many implementation classes public.
     */
    private final LDAPConnectionFactoryImpl impl;
    /**
@@ -125,18 +126,17 @@
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        impl.close();
    }
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
        return impl.getConnectionAsync(handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public Connection getConnection() throws ErrorResultException {
        return impl.getConnection();
@@ -183,9 +183,6 @@
        return impl.getSocketAddress();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        return impl.toString();
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -37,28 +37,22 @@
final class LoadBalancer implements ConnectionFactory {
    private final LoadBalancingAlgorithm algorithm;
    /**
     * Creates a new load balancer using the provided algorithm.
     *
     * @param algorithm
     *            The load balancing algorithm which will be used to obtain the
     *            next connection factory.
     */
    public LoadBalancer(final LoadBalancingAlgorithm algorithm) {
    LoadBalancer(final LoadBalancingAlgorithm algorithm) {
        Validator.ensureNotNull(algorithm);
        this.algorithm = algorithm;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        // Delegate to the algorithm.
        algorithm.close();
    }
    @Override
    public Connection getConnection() throws ErrorResultException {
        return algorithm.getConnectionFactory().getConnection();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> resultHandler) {
@@ -76,9 +70,7 @@
        return factory.getConnectionAsync(resultHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("LoadBalancer(");
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java
@@ -22,17 +22,28 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import java.io.Closeable;
/**
 * A load balancing algorithm distributes connection requests across one or more
 * underlying connection factories in an implementation defined manner.
 *
 * @see Connections#newLoadBalancer(LoadBalancingAlgorithm) newLoadBalancer
 */
public interface LoadBalancingAlgorithm {
public interface LoadBalancingAlgorithm extends Closeable {
    /**
     * Releases any resources associated with this algorithm, including any
     * associated connection factories.
     */
    @Override
    public void close();
    /**
     * Returns a connection factory which should be used in order to satisfy the
     * next connection request.
opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransportTestCase.java
@@ -22,11 +22,12 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2012 ForgeRock AS.
 *      Portions copyright 2012-2013 ForgeRock AS.
 */
package com.forgerock.opendj.ldap;
import static com.forgerock.opendj.ldap.DefaultTCPNIOTransport.DEFAULT_TRANSPORT;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.testng.Assert.assertTrue;
@@ -36,6 +37,8 @@
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.testng.annotations.Test;
import com.forgerock.opendj.util.ReferenceCountedObject;
/**
 * Tests DefaultTCPNIOTransport class.
 */
@@ -52,9 +55,10 @@
    @Test(enabled = false)
    public void testGetInstance() throws Exception {
        // Create a transport.
        final TCPNIOTransport transport = DefaultTCPNIOTransport.getInstance();
        final ReferenceCountedObject<TCPNIOTransport>.Reference transport =
                DEFAULT_TRANSPORT.acquire();
        SocketAddress socketAddress = findFreeSocketAddress();
        transport.bind(socketAddress);
        transport.get().bind(socketAddress);
        // Establish a socket connection to see if the transport factory works.
        final Socket socket = new Socket();
@@ -66,6 +70,7 @@
            // Don't stop the transport because it is shared with the ldap server.
        } finally {
            socket.close();
            transport.release();
        }
    }
}
opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/ReferenceCountedObjectTestCase.java
New file
@@ -0,0 +1,151 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions copyright 2013 ForgeRock AS.
 */
package com.forgerock.opendj.util;
import static org.fest.assertions.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import org.testng.annotations.Test;
/**
 * This Test Class tests {@link ReferenceCountedObject}.
 */
@SuppressWarnings("javadoc")
public class ReferenceCountedObjectTestCase extends UtilTestCase {
    private interface Impl {
        void destroyInstance(Object instance);
        Object newInstance();
    }
    private final Object object = "Test Object";
    @Test
    public void testAcquire() throws Exception {
        final Impl impl = mock(Impl.class);
        when(impl.newInstance()).thenReturn(object);
        final ReferenceCountedObject<Object> rco = rco(impl);
        // First acquisition should create new instance.
        final ReferenceCountedObject<Object>.Reference ref1 = rco.acquire();
        assertThat(ref1.get()).isSameAs(object);
        verify(impl).newInstance();
        verifyNoMoreInteractions(impl);
        // Second acquisition should just bump the ref count.
        final ReferenceCountedObject<Object>.Reference ref2 = rco.acquire();
        assertThat(ref2.get()).isSameAs(object);
        verifyNoMoreInteractions(impl);
        // First dereference should just decrease the ref count.
        ref1.release();
        verifyNoMoreInteractions(impl);
        // Second dereference should destroy the instance.
        ref2.release();
        verify(impl).destroyInstance(object);
        verifyNoMoreInteractions(impl);
    }
    @Test
    public void testAcquireIfNull() throws Exception {
        final Object otherObject = "Other object";
        final Impl impl = mock(Impl.class);
        when(impl.newInstance()).thenReturn(object);
        final ReferenceCountedObject<Object> rco = rco(impl);
        final ReferenceCountedObject<Object>.Reference ref = rco.acquireIfNull(otherObject);
        verify(impl, never()).newInstance();
        assertThat(ref.get()).isSameAs(otherObject);
        ref.release();
        verifyNoMoreInteractions(impl);
    }
    /**
     * This test attempts to test that finalization works. It loops at most 100
     * times performing GCs and checking to see if the finalizer was called.
     * Usually objects are finalized after 2 GCs, so the loop should complete
     * quite quickly.
     *
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test
    public void testFinalization() throws Exception {
        final Impl impl = mock(Impl.class);
        when(impl.newInstance()).thenReturn(object);
        final ReferenceCountedObject<Object> rco = rco(impl);
        ReferenceCountedObject<Object>.Reference ref = rco.acquire();
        System.gc();
        System.gc();
        verify(impl, never()).destroyInstance(object);
        // Read in order to prevent optimization.
        if (ref != null) {
            ref = null;
        }
        for (int i = 0; i < 100; i++) {
            System.gc();
            try {
                verify(impl).destroyInstance(object);
                break; // Finalized so stop.
            } catch (final Throwable t) {
                // Retry.
            }
        }
        verify(impl).destroyInstance(object);
    }
    @Test(expectedExceptions = NullPointerException.class)
    public void testStaleReference() throws Exception {
        final Impl impl = mock(Impl.class);
        when(impl.newInstance()).thenReturn(object);
        final ReferenceCountedObject<Object> rco = rco(impl);
        final ReferenceCountedObject<Object>.Reference ref = rco.acquire();
        ref.release();
        ref.get();
    }
    private ReferenceCountedObject<Object> rco(final Impl impl) {
        return new ReferenceCountedObject<Object>() {
            @Override
            protected void destroyInstance(final Object instance) {
                impl.destroyInstance(instance);
            }
            @Override
            protected Object newInstance() {
                return impl.newInstance();
            }
        };
    }
}
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthenticatedConnectionFactory.java
@@ -67,10 +67,9 @@
     * An authenticated connection supports all operations except Bind
     * operations.
     */
    static final class AuthenticatedConnection extends AbstractConnectionWrapper {
    static final class AuthenticatedConnection extends AbstractConnectionWrapper<Connection> {
        private final BindRequest request;
        private volatile BindResult result;
        private AuthenticatedConnection(final Connection connection, final BindRequest request,
@@ -80,28 +79,19 @@
            this.result = result;
        }
        /**
        /*
         * Bind operations are not supported by pre-authenticated connections.
         * This method will always throw {@code UnsupportedOperationException}.
         * These methods will always throw {@code UnsupportedOperationException}.
         */
        /**
         * {@inheritDoc}
         */
        public BindResult bind(BindRequest request) throws ErrorResultException {
            throw new UnsupportedOperationException();
        }
        /**
         * {@inheritDoc}
         */
        public BindResult bind(String name, char[] password) throws ErrorResultException {
            throw new UnsupportedOperationException();
        }
        /**
         * {@inheritDoc}
         */
        public FutureResult<BindResult> bindAsync(BindRequest request,
                IntermediateResponseHandler intermediateResponseHandler,
                ResultHandler<? super BindResult> resultHandler) {
@@ -140,15 +130,19 @@
                throw new UnsupportedOperationException();
            }
            // Wrap the client handler so that we can update the connection
            // state.
            /*
             * Wrap the client handler so that we can update the connection
             * state.
             */
            final ResultHandler<? super BindResult> clientHandler = handler;
            final ResultHandler<BindResult> handlerWrapper = new ResultHandler<BindResult>() {
                public void handleErrorResult(final ErrorResultException error) {
                    // This connection is now unauthenticated so prevent
                    // further use.
                    /*
                     * This connection is now unauthenticated so prevent further
                     * use.
                     */
                    connection.close();
                    if (clientHandler != null) {
@@ -170,9 +164,6 @@
            return connection.bindAsync(request, null, handlerWrapper);
        }
        /**
         * {@inheritDoc}
         */
        public String toString() {
            StringBuilder builder = new StringBuilder();
            builder.append("AuthenticatedConnection(");
@@ -255,9 +246,11 @@
        this.request = request;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        parentFactory.close();
    }
    public Connection getConnection() throws ErrorResultException {
        final Connection connection = parentFactory.getConnection();
        BindResult bindResult = null;
@@ -268,14 +261,14 @@
                connection.close();
            }
        }
        // If the bind didn't succeed then an exception will have been thrown
        // and this line will not be reached.
        /*
         * If the bind didn't succeed then an exception will have been thrown
         * and this line will not be reached.
         */
        return new AuthenticatedConnection(connection, request, bindResult);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
        final FutureResultImpl future = new FutureResultImpl(request, handler);
@@ -317,9 +310,6 @@
        return this;
    }
    /**
     * {@inheritDoc}
     */
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("AuthenticatedConnectionFactory(");
opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPAuthnFilter.java
@@ -104,17 +104,16 @@
    private boolean supportHTTPBasicAuthentication = true;
    private ServletApiVersionAdapter syncFactory;
    /**
     * {@inheritDoc}
     */
    @Override
    public void destroy() {
        // TODO: We should release any resources maintained by the filter, such as connection pools.
        if (searchLDAPConnectionFactory != null) {
            searchLDAPConnectionFactory.close();
        }
        if (bindLDAPConnectionFactory != null) {
            bindLDAPConnectionFactory.close();
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void doFilter(final ServletRequest request, final ServletResponse response,
            final FilterChain chain) throws IOException, ServletException {
@@ -295,9 +294,6 @@
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void init(final FilterConfig config) throws ServletException {
        // FIXME: make it possible to configure the filter externally, especially
opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPConnectionFactoryProvider.java
@@ -15,6 +15,9 @@
 */
package org.forgerock.opendj.rest2ldap.servlet;
import static org.forgerock.json.resource.Resources.newInternalConnectionFactory;
import static org.forgerock.opendj.rest2ldap.Rest2LDAP.configureConnectionFactory;
import java.io.InputStream;
import java.util.Map;
@@ -25,8 +28,11 @@
import org.codehaus.jackson.map.ObjectMapper;
import org.forgerock.json.fluent.JsonValue;
import org.forgerock.json.resource.CollectionResourceProvider;
import org.forgerock.json.resource.Connection;
import org.forgerock.json.resource.ConnectionFactory;
import org.forgerock.json.resource.Resources;
import org.forgerock.json.resource.FutureResult;
import org.forgerock.json.resource.ResourceException;
import org.forgerock.json.resource.ResultHandler;
import org.forgerock.json.resource.Router;
import org.forgerock.opendj.rest2ldap.AuthorizationPolicy;
import org.forgerock.opendj.rest2ldap.Rest2LDAP;
@@ -90,8 +96,8 @@
            final org.forgerock.opendj.ldap.ConnectionFactory ldapFactory;
            if (ldapFactoryName != null) {
                ldapFactory =
                        Rest2LDAP.configureConnectionFactory(configuration.get(
                                "ldapConnectionFactories").required(), ldapFactoryName);
                        configureConnectionFactory(configuration.get("ldapConnectionFactories")
                                .required(), ldapFactoryName);
            } else {
                ldapFactory = null;
            }
@@ -107,7 +113,33 @@
                                .configureMapping(mapping).build();
                router.addRoute(mappingUrl, provider);
            }
            return Resources.newInternalConnectionFactory(router);
            final ConnectionFactory factory = newInternalConnectionFactory(router);
            if (ldapFactory != null) {
                /*
                 * Return a wrapper which will release resources associated with
                 * the LDAP connection factory (pooled connections, transport,
                 * etc).
                 */
                return new ConnectionFactory() {
                    @Override
                    public FutureResult<Connection> getConnectionAsync(
                            ResultHandler<Connection> handler) {
                        return factory.getConnectionAsync(handler);
                    }
                    @Override
                    public Connection getConnection() throws ResourceException {
                        return factory.getConnection();
                    }
                    @Override
                    public void close() {
                        ldapFactory.close();
                    }
                };
            } else {
                return factory;
            }
        } catch (final ServletException e) {
            // Rethrow.
            throw e;
opendj3/opendj-server2x-adapter/src/main/java/org/forgerock/opendj/adapter/server2x/Adapters.java
@@ -145,6 +145,11 @@
        ConnectionFactory factory = new ConnectionFactory() {
            @Override
            public void close() {
                // Nothing to do.
            }
            @Override
            public FutureResult<Connection> getConnectionAsync(
                    ResultHandler<? super Connection> handler) {
                if (handler != null) {