/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.sdk; import java.util.Collection; import java.util.Stack; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import org.opends.sdk.requests.*; import org.opends.sdk.responses.*; import org.opends.sdk.schema.Schema; import com.sun.opends.sdk.util.AbstractFutureResult; import com.sun.opends.sdk.util.CompletedFutureResult; import com.sun.opends.sdk.util.FutureResultTransformer; import com.sun.opends.sdk.util.StaticUtils; /** * A simple connection pool implementation. */ final class ConnectionPool extends AbstractConnectionFactory { private final ConnectionFactory connectionFactory; private volatile int numConnections; private final int poolSize; // FIXME: should use a better collection than this - CLQ? private final Stack pool; private final ConcurrentLinkedQueue pendingFutures; private final Object lock = new Object(); private final class FutureNewConnection extends FutureResultTransformer { private FutureNewConnection( ResultHandler handler) { super(handler); } protected AsynchronousConnection transformResult( AsynchronousConnection result) throws ErrorResultException { return new PooledConnectionWapper(result); } } private final class PooledConnectionWapper implements AsynchronousConnection, ConnectionEventListener { private AsynchronousConnection connection; private PooledConnectionWapper(AsynchronousConnection connection) { this.connection = connection; this.connection.addConnectionEventListener(this); } public void abandon(AbandonRequest request) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } connection.abandon(request); } public FutureResult add(AddRequest request, ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.add(request, handler); } public FutureResult bind(BindRequest request, ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.bind(request, handler); } public void close() { synchronized (lock) { try { // Don't put closed connections back in the pool. if (connection.isClosed()) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Dead connection released to pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } return; } // See if there waiters pending. for (;;) { FuturePooledConnection future = pendingFutures.poll(); if (future == null) { // No waiters - so drop out and add connection to pool. break; } PooledConnectionWapper pooledConnection = new PooledConnectionWapper( connection); future.handleResult(pooledConnection); if (!future.isCancelled()) { // The future was not cancelled and the connection was // accepted. if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection released to pool and directly " + "given to waiter. numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } return; } } // No waiters. Put back in pool. pool.push(connection); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection released to pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } } finally { // Null out the underlying connection to prevent further use. connection = null; } } } public void close(UnbindRequest request, String reason) throws NullPointerException { close(); } public FutureResult compare(CompareRequest request, ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.compare(request, handler); } public FutureResult delete(DeleteRequest request, ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.delete(request, handler); } public FutureResult extendedRequest( ExtendedRequest request, ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.extendedRequest(request, handler); } public FutureResult modify(ModifyRequest request, ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.modify(request, handler); } public FutureResult modifyDN(ModifyDNRequest request, ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.modifyDN(request, handler); } public FutureResult search(SearchRequest request, ResultHandler resultHandler, SearchResultHandler searchResulthandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.search(request, resultHandler, searchResulthandler); } /** * {@inheritDoc} */ public FutureResult readEntry(DN name, Collection attributeDescriptions, ResultHandler resultHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.readEntry(name, attributeDescriptions, resultHandler); } /** * {@inheritDoc} */ public FutureResult searchSingleEntry( SearchRequest request, ResultHandler resultHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.searchSingleEntry(request, resultHandler); } /** * {@inheritDoc} */ public FutureResult readRootDSE( ResultHandler handler) throws UnsupportedOperationException, IllegalStateException { if (connection == null) { throw new IllegalStateException(); } return connection.readRootDSE(handler); } /** * {@inheritDoc} */ public FutureResult readSchemaForEntry(DN name, ResultHandler handler) throws UnsupportedOperationException, IllegalStateException { if (connection == null) { throw new IllegalStateException(); } return connection.readSchemaForEntry(name, handler); } /** * {@inheritDoc} */ public FutureResult readSchema(DN name, ResultHandler handler) throws UnsupportedOperationException, IllegalStateException { if (connection == null) { throw new IllegalStateException(); } return connection.readSchema(name, handler); } public void addConnectionEventListener( ConnectionEventListener listener) throws IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } } public void removeConnectionEventListener( ConnectionEventListener listener) throws NullPointerException { if (connection == null) { throw new IllegalStateException(); } } /** * {@inheritDoc} */ public boolean isClosed() { return connection == null; } public void connectionReceivedUnsolicitedNotification( GenericExtendedResult notification) { // Ignore } public void connectionErrorOccurred( boolean isDisconnectNotification, ErrorResultException error) { synchronized (lock) { // Remove this connection from the pool if its in there pool.remove(this); numConnections--; connection.removeConnectionEventListener(this); // FIXME: should still close the connection, but we need to be // careful that users of the pooled connection get a sensible // error if they continue to use it (i.e. not an NPE or ISE). if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection error occured: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } } } } // Future used for waiting for pooled connections to become available. private static final class FuturePooledConnection extends AbstractFutureResult { private FuturePooledConnection( ResultHandler handler) { super(handler); } /** * {@inheritDoc} */ public int getRequestID() { return -1; } } /** * Creates a new connection pool which will maintain {@code poolSize} * connections created using the provided connection factory. * * @param connectionFactory * The connection factory to use for creating new * connections. * @param poolSize * The maximum size of the connection pool. */ ConnectionPool(ConnectionFactory connectionFactory, int poolSize) { this.connectionFactory = connectionFactory; this.poolSize = poolSize; this.pool = new Stack(); this.pendingFutures = new ConcurrentLinkedQueue(); } public FutureResult getAsynchronousConnection( ResultHandler handler) { synchronized (lock) { // Check to see if we have a connection in the pool if (!pool.isEmpty()) { AsynchronousConnection conn = pool.pop(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection aquired from pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } PooledConnectionWapper pooledConnection = new PooledConnectionWapper( conn); if (handler != null) { handler.handleResult(pooledConnection); } return new CompletedFutureResult( pooledConnection); } // Pool was empty. Maybe a new connection if pool size is not // reached if (numConnections < poolSize) { // We can create a new connection. numConnections++; FutureNewConnection future = new FutureNewConnection(handler); future.setFutureResult(connectionFactory .getAsynchronousConnection(future)); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "New connection established and aquired. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } return future; } else { // Pool is full so wait for a connection to become available. FuturePooledConnection future = new FuturePooledConnection( handler); pendingFutures.add(future); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "No connections available. Wait-listed" + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } return future; } } } }