mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
01.35.2012 398a2eaf655e77b39fe71b644d95802c30749e0e
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -122,17 +122,17 @@
     * the client application closes this connection. More specifically, pooled
     * connections are not actually stored in the internal queue.
     */
    private final class PooledConnection implements Connection {
        // Connection event listeners registered against this pooled connection
        // should have the same life time as the pooled connection.
        private final List<ConnectionEventListener> listeners =
                new CopyOnWriteArrayList<ConnectionEventListener>();
    private final class PooledConnection implements Connection, ConnectionEventListener {
        private final Connection connection;
        private ErrorResultException error = null;
        private final AtomicBoolean isClosed = new AtomicBoolean(false);
        private boolean isDisconnectNotification = false;
        private List<ConnectionEventListener> listeners = null;
        // Guarded by stateLock.
        private final Object stateLock = new Object();
        PooledConnection(final Connection connection) {
        private PooledConnection(final Connection connection) {
            this.connection = connection;
        }
@@ -163,14 +163,34 @@
            return checkState().addAsync(request, intermediateResponseHandler, resultHandler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void addConnectionEventListener(final ConnectionEventListener listener) {
            Validator.ensureNotNull(listener);
            checkState();
            listeners.add(listener);
            final boolean notifyClose;
            final boolean notifyErrorOccurred;
            synchronized (stateLock) {
                notifyClose = isClosed.get();
                notifyErrorOccurred = error != null;
                if (!notifyClose) {
                    if (listeners == null) {
                        // Create and register first listener. If an error has
                        // already occurred on the underlying connection, then
                        // the listener may be immediately invoked so ensure
                        // that it is already in the list.
                        listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
                        listeners.add(listener);
                        connection.addConnectionEventListener(this);
                    } else {
                        listeners.add(listener);
                    }
                }
            }
            if (notifyErrorOccurred) {
                listener.handleConnectionError(isDisconnectNotification, error);
            }
            if (notifyClose) {
                listener.handleConnectionClosed();
            }
        }
        @Override
@@ -191,14 +211,21 @@
            return checkState().bindAsync(request, intermediateResponseHandler, resultHandler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void close() {
            if (!isClosed.compareAndSet(false, true)) {
                // Already closed.
                return;
            final List<ConnectionEventListener> tmpListeners;
            synchronized (stateLock) {
                if (!isClosed.compareAndSet(false, true)) {
                    // Already closed.
                    return;
                }
                tmpListeners = listeners;
            }
            // Remove underlying listener if needed and do this before
            // subsequent connection events may occur.
            if (tmpListeners != null) {
                connection.removeConnectionEventListener(this);
            }
            // Don't put invalid connections back in the pool.
@@ -222,11 +249,15 @@
                            - currentPoolSize.availablePermits(), poolSize));
                }
            }
            // Invoke listeners.
            if (tmpListeners != null) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    listener.handleConnectionClosed();
                }
            }
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void close(final UnbindRequest request, final String reason) {
            close();
@@ -294,17 +325,51 @@
                    resultHandler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void handleConnectionClosed() {
            /*
             * The underlying connection was closed by the client. This can only
             * occur when the pool is being shut down and the underlying
             * connection is not in use.
             */
            throw new IllegalStateException(
                    "Pooled connection received unexpected close notification");
        }
        @Override
        public void handleConnectionError(final boolean isDisconnectNotification,
                final ErrorResultException error) {
            final List<ConnectionEventListener> tmpListeners;
            synchronized (stateLock) {
                tmpListeners = listeners;
                this.isDisconnectNotification = isDisconnectNotification;
                this.error = error;
            }
            if (tmpListeners != null) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    listener.handleConnectionError(isDisconnectNotification, error);
                }
            }
        }
        @Override
        public void handleUnsolicitedNotification(final ExtendedResult notification) {
            final List<ConnectionEventListener> tmpListeners;
            synchronized (stateLock) {
                tmpListeners = listeners;
            }
            if (tmpListeners != null) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    listener.handleUnsolicitedNotification(notification);
                }
            }
        }
        @Override
        public boolean isClosed() {
            return isClosed.get();
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public boolean isValid() {
            return connection.isValid() && !isClosed();
@@ -363,14 +428,14 @@
            return checkState().readEntryAsync(name, attributeDescriptions, handler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void removeConnectionEventListener(final ConnectionEventListener listener) {
            Validator.ensureNotNull(listener);
            checkState();
            listeners.remove(listener);
            synchronized (stateLock) {
                if (listeners != null) {
                    listeners.remove(listener);
                }
            }
        }
        @Override
@@ -430,9 +495,6 @@
            return checkState().searchSingleEntryAsync(request, handler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public String toString() {
            final StringBuilder builder = new StringBuilder();
@@ -495,19 +557,19 @@
        }
    }
    // Guarded by queue.
    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
    private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
    private final Semaphore currentPoolSize;
    private final ConnectionFactory factory;
    // Guarded by queue.
    private boolean isClosed = false;
    private final ConnectionFactory factory;
    private final int poolSize;
    private final Semaphore currentPoolSize;
    private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
    // Guarded by queue.
    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
    /**
     * Creates a new connection pool which will maintain {@code poolSize}
@@ -564,7 +626,7 @@
    public Connection getConnection() throws ErrorResultException {
        try {
            return getConnectionAsync(null).get();
        } catch (InterruptedException e) {
        } catch (final InterruptedException e) {
            throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
        }
    }