opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java
@@ -559,10 +559,10 @@ final String remoteAddress = args[i]; final int remotePort = Integer.parseInt(args[i + 1]); factories.add(Connections.newConnectionPool( factories.add(Connections.newFixedConnectionPool( new LDAPConnectionFactory(remoteAddress, remotePort), Integer.MAX_VALUE)); bindFactories.add(Connections.newConnectionPool( bindFactories.add(Connections.newFixedConnectionPool( new LDAPConnectionFactory(remoteAddress, remotePort), Integer.MAX_VALUE)); } opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
@@ -22,806 +22,86 @@ * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS * Copyright 2011 ForgeRock AS */ package org.forgerock.opendj.ldap; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import org.forgerock.opendj.ldap.requests.*; import org.forgerock.opendj.ldap.responses.*; import com.forgerock.opendj.util.*; import java.io.Closeable; /** * A simple connection pool implementation. * A connection factory which maintains and re-uses a pool of connections. * Connections obtained from a connection pool are returned to the connection * pool when closed, although connection pool implementations may choose to * physically close the connection if needed (e.g. in order to reduce the size * of the pool). * <p> * When connection pools are no longer needed they must be explicitly closed in * order to close any remaining pooled connections. * <p> * Since pooled connections are re-used, applications must use operations such * as binds and StartTLS with extreme caution. */ final class ConnectionPool extends AbstractConnectionFactory public interface ConnectionPool extends ConnectionFactory, Closeable { /** * This result handler is invoked when an attempt to add a new connection to * the pool completes. * Releases any resources associated with this connection pool. Pooled * connections will be permanently closed and this connection pool will no * longer be available for use. * <p> * Attempts to use this connection pool after it has been closed will result * in an {@code IllegalStateException}. * <p> * Calling {@code close} on a connection pool which is already closed has no * effect. */ private final class ConnectionResultHandler implements ResultHandler<AsynchronousConnection> { /** * {@inheritDoc} */ @Override public void handleErrorResult(final ErrorResultException error) { // Connection attempt failed, so decrease the pool size. currentPoolSize.release(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection attempt failed: " + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } QueueElement holder; synchronized (queue) { if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) { // No waiting futures. return; } else { holder = queue.removeFirst(); } } // There was waiting future, so close it. holder.getWaitingFuture().handleErrorResult(error); } void close(); /** * {@inheritDoc} */ @Override public void handleResult(final AsynchronousConnection connection) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection attempt succeeded: " + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } publishConnection(connection); } } /** * A pooled connection is passed to the client. It wraps an underlying * "pooled" connection obtained from the underlying factory and lasts until * the client application closes this connection. More specifically, pooled * connections are not actually stored in the internal queue. */ private final class PooledConnection implements AsynchronousConnection { // Connection event listeners registed against this pooled connection should // have the same life time as the pooled connection. private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); private final AsynchronousConnection connection; private final AtomicBoolean isClosed = new AtomicBoolean(false); PooledConnection(final AsynchronousConnection connection) { this.connection = connection; } /** * {@inheritDoc} */ @Override public FutureResult<Void> abandon(final AbandonRequest request) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.abandon(request); } /** * {@inheritDoc} */ @Override public FutureResult<Result> add(final AddRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.add(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> add(final AddRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection .add(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public void addConnectionEventListener( final ConnectionEventListener listener) throws IllegalStateException, NullPointerException { Validator.ensureNotNull(listener); if (isClosed()) { throw new IllegalStateException(); } listeners.add(listener); } /** * {@inheritDoc} */ @Override public FutureResult<BindResult> bind(final BindRequest request, final ResultHandler<? super BindResult> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.bind(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<BindResult> bind(final BindRequest request, final ResultHandler<? super BindResult> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.bind(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public void close() { if (!isClosed.compareAndSet(false, true)) { // Already closed. return; } // Don't put invalid connections back in the pool. if (connection.isValid()) { publishConnection(connection); } else { // The connection may have been disconnected by the remote server, but // the server may still be available. In order to avoid leaving pending // futures hanging indefinitely, we should try to reconnect immediately. // Close the dead connection. connection.close(); // Try to get a new connection to replace it. factory.getAsynchronousConnection(connectionResultHandler); if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.warning(String.format( "Connection no longer valid. " + "currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } } } /** * {@inheritDoc} */ @Override public void close(final UnbindRequest request, final String reason) throws NullPointerException { close(); } /** * {@inheritDoc} */ @Override public FutureResult<CompareResult> compare(final CompareRequest request, final ResultHandler<? super CompareResult> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.compare(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<CompareResult> compare(final CompareRequest request, final ResultHandler<? super CompareResult> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.compare(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> delete(final DeleteRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.delete(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> delete(final DeleteRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.delete(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public <R extends ExtendedResult> FutureResult<R> extendedRequest( final ExtendedRequest<R> request, final ResultHandler<? super R> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.extendedRequest(request, handler); } /** * {@inheritDoc} */ @Override public <R extends ExtendedResult> FutureResult<R> extendedRequest( final ExtendedRequest<R> request, final ResultHandler<? super R> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.extendedRequest(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public Connection getSynchronousConnection() { return new SynchronousConnection(this); } /** * {@inheritDoc} */ @Override public boolean isClosed() { return isClosed.get(); } /** * {@inheritDoc} */ @Override public boolean isValid() { return connection.isValid() && !isClosed(); } /** * {@inheritDoc} */ @Override public FutureResult<Result> modify(final ModifyRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modify(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> modify(final ModifyRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modify(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> modifyDN(final ModifyDNRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modifyDN(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> modifyDN(final ModifyDNRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modifyDN(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult<SearchResultEntry> readEntry(final DN name, final Collection<String> attributeDescriptions, final ResultHandler<? super SearchResultEntry> resultHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.readEntry(name, attributeDescriptions, resultHandler); } /** * {@inheritDoc} */ @Override public void removeConnectionEventListener( final ConnectionEventListener listener) throws NullPointerException { Validator.ensureNotNull(listener); if (isClosed()) { throw new IllegalStateException(); } listeners.remove(listener); } /** * {@inheritDoc} */ @Override public FutureResult<Result> search(final SearchRequest request, final SearchResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.search(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> search(final SearchRequest request, final SearchResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.search(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult<SearchResultEntry> searchSingleEntry( final SearchRequest request, final ResultHandler<? super SearchResultEntry> resultHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.searchSingleEntry(request, resultHandler); } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("PooledConnection("); builder.append(connection); builder.append(')'); return builder.toString(); } } /** * A queue element is either a pending connection request future awaiting an * {@code AsynchronousConnection} or it is an unused * {@code AsynchronousConnection} awaiting a connection request. */ private static final class QueueElement { private final Object value; QueueElement(final AsynchronousConnection connection) { this.value = connection; } QueueElement(final ResultHandler<? super AsynchronousConnection> handler) { this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler); } AsynchronousConnection getWaitingConnection() { if (value instanceof AsynchronousConnection) { return (AsynchronousConnection) value; } else { throw new IllegalStateException(); } } @SuppressWarnings("unchecked") AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture() { if (value instanceof AsynchronousFutureResult) { return (AsynchronousFutureResult<AsynchronousConnection>) value; } else { throw new IllegalStateException(); } } boolean isWaitingFuture() { return value instanceof AsynchronousFutureResult; } public String toString() { return String.valueOf(value); } } // Guarded by queue. private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); private final ConnectionFactory factory; private final int poolSize; private final Semaphore currentPoolSize; private final ResultHandler<AsynchronousConnection> connectionResultHandler = new ConnectionResultHandler(); /** * Creates a new connection pool which will maintain {@code poolSize} * connections created using the provided connection factory. * Obtains an asynchronous connection from this connection pool, potentially * opening a new connection if needed. * <p> * The returned {@code FutureResult} can be used to retrieve the pooled * asynchronous connection. Alternatively, if a {@code ResultHandler} is * provided, the handler will be notified when the pooled connection is * available and ready for use. * <p> * Closing the pooled connection will, depending on the connection pool * implementation, return the connection to this pool without closing it. * * @param factory * The connection factory to use for creating new connections. * @param poolSize * The maximum size of the connection pool. * @param handler * The completion handler, or {@code null} if no handler is to be * used. * @return A future which can be used to retrieve the pooled asynchronous * connection. * @throws IllegalStateException * If this connection pool has already been closed. */ ConnectionPool(final ConnectionFactory factory, final int poolSize) { this.factory = factory; this.poolSize = poolSize; this.currentPoolSize = new Semaphore(poolSize); } FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AsynchronousConnection> handler); /** * {@inheritDoc} * Obtains a connection from this connection pool, potentially opening a new * connection if needed. * <p> * Closing the pooled connection will, depending on the connection pool * implementation, return the connection to this pool without closing it. * * @return A pooled connection. * @throws ErrorResultException * If the connection request failed for some reason. * @throws InterruptedException * If the current thread was interrupted while waiting. * @throws IllegalStateException * If this connection pool has already been closed. */ @Override public FutureResult<AsynchronousConnection> getAsynchronousConnection( final ResultHandler<? super AsynchronousConnection> handler) { QueueElement holder; synchronized (queue) { if (queue.isEmpty() || queue.getFirst().isWaitingFuture()) { holder = new QueueElement(handler); queue.add(holder); } else { holder = queue.removeFirst(); } } if (!holder.isWaitingFuture()) { // There was a completed connection attempt. final AsynchronousConnection connection = holder.getWaitingConnection(); final PooledConnection pooledConnection = new PooledConnection(connection); if (handler != null) { handler.handleResult(pooledConnection); } return new CompletedFutureResult<AsynchronousConnection>(pooledConnection); } else { // Grow the pool if needed. final FutureResult<AsynchronousConnection> future = holder .getWaitingFuture(); if (!future.isDone() && currentPoolSize.tryAcquire()) { factory.getAsynchronousConnection(connectionResultHandler); } return future; } } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("ConnectionPool("); builder.append(String.valueOf(factory)); builder.append(','); builder.append(poolSize); builder.append(')'); return builder.toString(); } private void publishConnection(final AsynchronousConnection connection) { QueueElement holder; synchronized (queue) { if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) { holder = new QueueElement(connection); queue.add(holder); return; } else { holder = queue.removeFirst(); } } // There was waiting future, so close it. final PooledConnection pooledConnection = new PooledConnection(connection); holder.getWaitingFuture().handleResult(pooledConnection); } Connection getConnection() throws ErrorResultException, InterruptedException; } opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -23,6 +23,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.forgerock.opendj.ldap; @@ -92,13 +93,13 @@ * @throws NullPointerException * If {@code factory} was {@code null}. */ public static ConnectionFactory newConnectionPool( public static ConnectionPool newFixedConnectionPool( final ConnectionFactory factory, final int poolSize) throws IllegalArgumentException, NullPointerException { Validator.ensureNotNull(factory); Validator.ensureTrue(poolSize >= 0, "negative pool size"); return new ConnectionPool(factory, poolSize); return new FixedConnectionPool(factory, poolSize); } opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
New file @@ -0,0 +1,944 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opendj3/legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opendj3/legal-notices/CDDLv1_0.txt. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.forgerock.opendj.ldap; import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import org.forgerock.opendj.ldap.requests.*; import org.forgerock.opendj.ldap.responses.*; import com.forgerock.opendj.util.*; /** * A simple connection pool implementation which maintains a fixed number of * connections. */ final class FixedConnectionPool extends AbstractConnectionFactory implements ConnectionPool { /** * This result handler is invoked when an attempt to add a new connection to * the pool completes. */ private final class ConnectionResultHandler implements ResultHandler<AsynchronousConnection> { /** * {@inheritDoc} */ @Override public void handleErrorResult(final ErrorResultException error) { // Connection attempt failed, so decrease the pool size. currentPoolSize.release(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format("Connection attempt failed: " + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } QueueElement holder; synchronized (queue) { if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) { // No waiting futures. return; } else { holder = queue.removeFirst(); } } // There was waiting future, so close it. holder.getWaitingFuture().handleErrorResult(error); } /** * {@inheritDoc} */ @Override public void handleResult(final AsynchronousConnection connection) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection attempt succeeded: " + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } publishConnection(connection); } } /** * A pooled connection is passed to the client. It wraps an underlying * "pooled" connection obtained from the underlying factory and lasts until * the client application closes this connection. More specifically, pooled * connections are not actually stored in the internal queue. */ private final class PooledConnection implements AsynchronousConnection { // Connection event listeners registed against this pooled connection should // have the same life time as the pooled connection. private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); private final AsynchronousConnection connection; private final AtomicBoolean isClosed = new AtomicBoolean(false); PooledConnection(final AsynchronousConnection connection) { this.connection = connection; } /** * {@inheritDoc} */ @Override public FutureResult<Void> abandon(final AbandonRequest request) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.abandon(request); } /** * {@inheritDoc} */ @Override public FutureResult<Result> add(final AddRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.add(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> add(final AddRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection .add(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public void addConnectionEventListener( final ConnectionEventListener listener) throws IllegalStateException, NullPointerException { Validator.ensureNotNull(listener); if (isClosed()) { throw new IllegalStateException(); } listeners.add(listener); } /** * {@inheritDoc} */ @Override public FutureResult<BindResult> bind(final BindRequest request, final ResultHandler<? super BindResult> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.bind(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<BindResult> bind(final BindRequest request, final ResultHandler<? super BindResult> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.bind(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public void close() { if (!isClosed.compareAndSet(false, true)) { // Already closed. return; } // Don't put invalid connections back in the pool. if (connection.isValid()) { publishConnection(connection); } else { // The connection may have been disconnected by the remote server, but // the server may still be available. In order to avoid leaving pending // futures hanging indefinitely, we should try to reconnect immediately. // Close the dead connection. connection.close(); // Try to get a new connection to replace it. factory.getAsynchronousConnection(connectionResultHandler); if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.warning(String.format( "Connection no longer valid. " + "currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } } } /** * {@inheritDoc} */ @Override public void close(final UnbindRequest request, final String reason) throws NullPointerException { close(); } /** * {@inheritDoc} */ @Override public FutureResult<CompareResult> compare(final CompareRequest request, final ResultHandler<? super CompareResult> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.compare(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<CompareResult> compare(final CompareRequest request, final ResultHandler<? super CompareResult> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.compare(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> delete(final DeleteRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.delete(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> delete(final DeleteRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.delete(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public <R extends ExtendedResult> FutureResult<R> extendedRequest( final ExtendedRequest<R> request, final ResultHandler<? super R> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.extendedRequest(request, handler); } /** * {@inheritDoc} */ @Override public <R extends ExtendedResult> FutureResult<R> extendedRequest( final ExtendedRequest<R> request, final ResultHandler<? super R> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.extendedRequest(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public Connection getSynchronousConnection() { return new SynchronousConnection(this); } /** * {@inheritDoc} */ @Override public boolean isClosed() { return isClosed.get(); } /** * {@inheritDoc} */ @Override public boolean isValid() { return connection.isValid() && !isClosed(); } /** * {@inheritDoc} */ @Override public FutureResult<Result> modify(final ModifyRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modify(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> modify(final ModifyRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modify(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> modifyDN(final ModifyDNRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modifyDN(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> modifyDN(final ModifyDNRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modifyDN(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult<SearchResultEntry> readEntry(final DN name, final Collection<String> attributeDescriptions, final ResultHandler<? super SearchResultEntry> resultHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.readEntry(name, attributeDescriptions, resultHandler); } /** * {@inheritDoc} */ @Override public void removeConnectionEventListener( final ConnectionEventListener listener) throws NullPointerException { Validator.ensureNotNull(listener); if (isClosed()) { throw new IllegalStateException(); } listeners.remove(listener); } /** * {@inheritDoc} */ @Override public FutureResult<Result> search(final SearchRequest request, final SearchResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.search(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult<Result> search(final SearchRequest request, final SearchResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.search(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult<SearchResultEntry> searchSingleEntry( final SearchRequest request, final ResultHandler<? super SearchResultEntry> resultHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.searchSingleEntry(request, resultHandler); } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("PooledConnection("); builder.append(connection); builder.append(')'); return builder.toString(); } } /** * A queue element is either a pending connection request future awaiting an * {@code AsynchronousConnection} or it is an unused * {@code AsynchronousConnection} awaiting a connection request. */ private static final class QueueElement { private final Object value; QueueElement(final AsynchronousConnection connection) { this.value = connection; } QueueElement(final ResultHandler<? super AsynchronousConnection> handler) { this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler); } @Override public String toString() { return String.valueOf(value); } AsynchronousConnection getWaitingConnection() { if (value instanceof AsynchronousConnection) { return (AsynchronousConnection) value; } else { throw new IllegalStateException(); } } @SuppressWarnings("unchecked") AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture() { if (value instanceof AsynchronousFutureResult) { return (AsynchronousFutureResult<AsynchronousConnection>) value; } else { throw new IllegalStateException(); } } boolean isWaitingFuture() { return value instanceof AsynchronousFutureResult; } } // Guarded by queue. private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); // Guarded by queue. private boolean isClosed = false; private final ConnectionFactory factory; private final int poolSize; private final Semaphore currentPoolSize; private final ResultHandler<AsynchronousConnection> connectionResultHandler = new ConnectionResultHandler(); /** * Creates a new connection pool which will maintain {@code poolSize} * connections created using the provided connection factory. * * @param factory * The connection factory to use for creating new connections. * @param poolSize * The maximum size of the connection pool. */ FixedConnectionPool(final ConnectionFactory factory, final int poolSize) { this.factory = factory; this.poolSize = poolSize; this.currentPoolSize = new Semaphore(poolSize); } /** * {@inheritDoc} */ @Override public void close() { final LinkedList<AsynchronousConnection> idleConnections; synchronized (queue) { if (isClosed) { return; } isClosed = true; // Remove any connections which are waiting in the queue as these can be // closed immediately. idleConnections = new LinkedList<AsynchronousConnection>(); while (!queue.isEmpty() && !queue.getFirst().isWaitingFuture()) { final QueueElement holder = queue.removeFirst(); idleConnections.add(holder.getWaitingConnection()); } } if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection pool is closing: currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } // Close the idle connections. for (final AsynchronousConnection connection : idleConnections) { closeConnection(connection); } } /** * {@inheritDoc} */ @Override public FutureResult<AsynchronousConnection> getAsynchronousConnection( final ResultHandler<? super AsynchronousConnection> handler) { QueueElement holder; synchronized (queue) { if (isClosed) { throw new IllegalStateException("FixedConnectionPool is already closed"); } if (queue.isEmpty() || queue.getFirst().isWaitingFuture()) { holder = new QueueElement(handler); queue.add(holder); } else { holder = queue.removeFirst(); } } if (!holder.isWaitingFuture()) { // There was a completed connection attempt. final AsynchronousConnection connection = holder.getWaitingConnection(); final PooledConnection pooledConnection = new PooledConnection(connection); if (handler != null) { handler.handleResult(pooledConnection); } return new CompletedFutureResult<AsynchronousConnection>(pooledConnection); } else { // Grow the pool if needed. final FutureResult<AsynchronousConnection> future = holder .getWaitingFuture(); if (!future.isDone() && currentPoolSize.tryAcquire()) { factory.getAsynchronousConnection(connectionResultHandler); } return future; } } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("FixedConnectionPool("); builder.append(String.valueOf(factory)); builder.append(','); builder.append(poolSize); builder.append(')'); return builder.toString(); } /** * Provide a finalizer because connection pools are expensive resources to * accidentally leave around. Also, since they won't be created all that * frequently, there's little risk of overloading the finalizer. */ @Override protected void finalize() throws Throwable { close(); } private void closeConnection(final AsynchronousConnection connection) { // The connection will be closed, so decrease the pool size. currentPoolSize.release(); connection.close(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Closing connection because connection pool is closing: " + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } } private void publishConnection(final AsynchronousConnection connection) { final QueueElement holder; boolean connectionPoolIsClosing = false; synchronized (queue) { if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) { if (isClosed) { connectionPoolIsClosing = true; holder = null; } else { holder = new QueueElement(connection); queue.add(holder); return; } } else { connectionPoolIsClosing = isClosed; holder = queue.removeFirst(); } } // There was waiting future, so complete it. if (connectionPoolIsClosing) { closeConnection(connection); if (holder != null) { final ErrorResultException e = ErrorResultException.newErrorResult( ResultCode.CLIENT_SIDE_USER_CANCELLED, ERR_CONNECTION_POOL_CLOSING .get(toString()).toString()); holder.getWaitingFuture().handleErrorResult(e); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection attempt failed: " + e.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } } } else { final PooledConnection pooledConnection = new PooledConnection(connection); holder.getWaitingFuture().handleResult(pooledConnection); } } } opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
@@ -1385,3 +1385,5 @@ WARN_ATTR_SYNTAX_DSR_INVALID_SUPERIOR_RULE=The definition for \ the DIT structure rule "%s" declared a superior rule "%s" which has been \ removed from the schema because it is invalid ERR_CONNECTION_POOL_CLOSING=No connection could be obtained from connection \ pool "%s" because it is closing opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -33,11 +33,14 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import javax.net.ssl.SSLContext; @@ -48,11 +51,14 @@ import org.forgerock.opendj.ldap.responses.SearchResultEntry; import org.forgerock.opendj.ldap.schema.Schema; import org.forgerock.opendj.ldap.schema.SchemaBuilder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.forgerock.opendj.util.CompletedFutureResult; import com.forgerock.opendj.util.StaticUtils; @@ -205,7 +211,7 @@ "online"); // Connection pools. factories[7][0] = Connections.newConnectionPool(onlineServer, 10); factories[7][0] = Connections.newFixedConnectionPool(onlineServer, 10); // Round robin. factories[8][0] = Connections @@ -220,9 +226,10 @@ .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList( offlineServer1, offlineServer2, onlineServer))); factories[13][0] = Connections .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList( Connections.newConnectionPool(offlineServer1, 10), Connections.newConnectionPool(onlineServer, 10)))); .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays .<ConnectionFactory> asList( Connections.newFixedConnectionPool(offlineServer1, 10), Connections.newFixedConnectionPool(onlineServer, 10)))); // Fail-over. factories[14][0] = Connections @@ -237,11 +244,12 @@ .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays.asList( offlineServer1, offlineServer2, onlineServer))); factories[19][0] = Connections .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays.asList( Connections.newConnectionPool(offlineServer1, 10), Connections.newConnectionPool(onlineServer, 10)))); .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays .<ConnectionFactory> asList( Connections.newFixedConnectionPool(offlineServer1, 10), Connections.newFixedConnectionPool(onlineServer, 10)))); factories[20][0] = Connections.newConnectionPool(onlineServer, 10); factories[20][0] = Connections.newFixedConnectionPool(onlineServer, 10); return factories; } @@ -367,4 +375,119 @@ connection.close(); } } /** * Tests connection pool closure. * * @throws Exception If an unexpected exception occurred. */ @SuppressWarnings("unchecked") @Test public void testConnectionPoolClose() throws Exception { // We'll use a pool of 4 connections. final int SIZE = 4; // Count number of real connections which are open. final AtomicInteger realConnectionCount = new AtomicInteger(); final boolean[] realConnectionIsClosed = new boolean[SIZE]; Arrays.fill(realConnectionIsClosed, true); // Mock underlying connection factory which always succeeds. final ConnectionFactory mockFactory = mock(ConnectionFactory.class); when(mockFactory.getAsynchronousConnection(any(ResultHandler.class))) .thenAnswer(new Answer<FutureResult<AsynchronousConnection>>() { public FutureResult<AsynchronousConnection> answer( InvocationOnMock invocation) throws Throwable { // Update state. final int connectionID = realConnectionCount.getAndIncrement(); realConnectionIsClosed[connectionID] = false; // Mock connection decrements counter on close. AsynchronousConnection mockConnection = mock(AsynchronousConnection.class); doAnswer(new Answer<Void>() { public Void answer(InvocationOnMock invocation) throws Throwable { realConnectionCount.decrementAndGet(); realConnectionIsClosed[connectionID] = true; return null; } }).when(mockConnection).close(); when(mockConnection.isValid()).thenReturn(true); when(mockConnection.toString()).thenReturn("Mock connection " + connectionID); // Excecute handler and return future. ResultHandler<? super AsynchronousConnection> handler = (ResultHandler<? super AsynchronousConnection>) invocation.getArguments()[0]; if (handler != null) { handler.handleResult(mockConnection); } return new CompletedFutureResult<AsynchronousConnection>( mockConnection); } }); ConnectionPool pool = Connections.newFixedConnectionPool(mockFactory, SIZE); Connection[] pooledConnections = new Connection[SIZE]; for (int i = 0; i < SIZE; i++) { pooledConnections[i] = pool.getConnection(); } // Pool is fully utilized. assertThat(realConnectionCount.get()).isEqualTo(SIZE); assertThat(pooledConnections[0].isClosed()).isFalse(); assertThat(pooledConnections[1].isClosed()).isFalse(); assertThat(pooledConnections[2].isClosed()).isFalse(); assertThat(pooledConnections[3].isClosed()).isFalse(); assertThat(realConnectionIsClosed[0]).isFalse(); assertThat(realConnectionIsClosed[1]).isFalse(); assertThat(realConnectionIsClosed[2]).isFalse(); assertThat(realConnectionIsClosed[3]).isFalse(); // Release two connections. pooledConnections[0].close(); pooledConnections[1].close(); assertThat(realConnectionCount.get()).isEqualTo(4); assertThat(pooledConnections[0].isClosed()).isTrue(); assertThat(pooledConnections[1].isClosed()).isTrue(); assertThat(pooledConnections[2].isClosed()).isFalse(); assertThat(pooledConnections[3].isClosed()).isFalse(); assertThat(realConnectionIsClosed[0]).isFalse(); assertThat(realConnectionIsClosed[1]).isFalse(); assertThat(realConnectionIsClosed[2]).isFalse(); assertThat(realConnectionIsClosed[3]).isFalse(); // Close the pool closing the two connections immediately. pool.close(); assertThat(realConnectionCount.get()).isEqualTo(2); assertThat(pooledConnections[0].isClosed()).isTrue(); assertThat(pooledConnections[1].isClosed()).isTrue(); assertThat(pooledConnections[2].isClosed()).isFalse(); assertThat(pooledConnections[3].isClosed()).isFalse(); assertThat(realConnectionIsClosed[0]).isTrue(); assertThat(realConnectionIsClosed[1]).isTrue(); assertThat(realConnectionIsClosed[2]).isFalse(); assertThat(realConnectionIsClosed[3]).isFalse(); // Release two remaining connections and check that they get closed. pooledConnections[2].close(); pooledConnections[3].close(); assertThat(realConnectionCount.get()).isEqualTo(0); assertThat(pooledConnections[0].isClosed()).isTrue(); assertThat(pooledConnections[1].isClosed()).isTrue(); assertThat(pooledConnections[2].isClosed()).isTrue(); assertThat(pooledConnections[3].isClosed()).isTrue(); assertThat(realConnectionIsClosed[0]).isTrue(); assertThat(realConnectionIsClosed[1]).isTrue(); assertThat(realConnectionIsClosed[2]).isTrue(); assertThat(realConnectionIsClosed[3]).isTrue(); } } opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
@@ -604,10 +604,11 @@ // Round robin. final ConnectionFactory loadBalancer = Connections .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList( Connections.newConnectionPool(offlineServer1, 10), Connections.newConnectionPool(offlineServer2, 10), Connections.newConnectionPool(onlineServer, 10)))); .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays .<ConnectionFactory> asList( Connections.newFixedConnectionPool(offlineServer1, 10), Connections.newFixedConnectionPool(offlineServer2, 10), Connections.newFixedConnectionPool(onlineServer, 10)))); final MockServerConnection proxyServerConnection = new MockServerConnection(); final MockServerConnectionFactory proxyServerConnectionFactory = new MockServerConnectionFactory( @@ -699,10 +700,11 @@ // Round robin. final ConnectionFactory loadBalancer = Connections .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList( Connections.newConnectionPool(offlineServer1, 10), Connections.newConnectionPool(offlineServer2, 10), Connections.newConnectionPool(onlineServer, 10)))); .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays .<ConnectionFactory> asList( Connections.newFixedConnectionPool(offlineServer1, 10), Connections.newFixedConnectionPool(offlineServer2, 10), Connections.newFixedConnectionPool(onlineServer, 10)))); final MockServerConnection proxyServerConnection = new MockServerConnection() {