/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.sdk; import java.util.Collection; import java.util.Stack; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; import org.opends.sdk.requests.*; import org.opends.sdk.responses.*; import org.opends.sdk.schema.Schema; import com.sun.opends.sdk.util.StaticUtils; /** * A simple connection pool implementation. */ public class ConnectionPool extends AbstractConnectionFactory { private final ConnectionFactory connectionFactory; private volatile int numConnections; private final int poolSize; // FIXME: should use a better collection than this - CLQ? private final Stack pool; private final ConcurrentLinkedQueue> pendingFutures; private final Object lock = new Object(); private final class PooledConnectionWapper implements AsynchronousConnection, ConnectionEventListener { private AsynchronousConnection connection; private PooledConnectionWapper(AsynchronousConnection connection) { this.connection = connection; this.connection.addConnectionEventListener(this); } public void abandon(AbandonRequest request) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } connection.abandon(request); } public

ResultFuture add(AddRequest request, ResultHandler handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.add(request, handler, p); } public

ResultFuture bind(BindRequest request, ResultHandler handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.bind(request, handler, p); } public void close() { synchronized (lock) { try { // Don't put closed connections back in the pool. if (connection.isClosed()) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Dead connection released to pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } return; } // See if there waiters pending PendingConnectionFuture future = pendingFutures.poll(); if (future != null) { PooledConnectionWapper pooledConnection = new PooledConnectionWapper( connection); future.connection(pooledConnection); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection released to pool and directly " + "given to waiter. numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } return; } // No waiters. Put back in pool. pool.push(connection); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection released to pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } } finally { // Null out the underlying connection to prevent further use. connection = null; } } } public void close(UnbindRequest request, String reason) throws NullPointerException { close(); } public

ResultFuture compare( CompareRequest request, ResultHandler handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.compare(request, handler, p); } public

ResultFuture delete(DeleteRequest request, ResultHandler handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.delete(request, handler, p); } public ResultFuture extendedRequest( ExtendedRequest request, ResultHandler handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.extendedRequest(request, handler, p); } public

ResultFuture modify(ModifyRequest request, ResultHandler handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.modify(request, handler, p); } public

ResultFuture modifyDN(ModifyDNRequest request, ResultHandler handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.modifyDN(request, handler, p); } public

ResultFuture search(SearchRequest request, ResultHandler resultHandler, SearchResultHandler

searchResulthandler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.search(request, resultHandler, searchResulthandler, p); } /** * {@inheritDoc} */ public

ResultFuture readEntry(DN name, Collection attributeDescriptions, ResultHandler resultHandler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.readEntry(name, attributeDescriptions, resultHandler, p); } /** * {@inheritDoc} */ public

ResultFuture searchSingleEntry( SearchRequest request, ResultHandler resultHandler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.searchSingleEntry(request, resultHandler, p); } /** * {@inheritDoc} */ public

ResultFuture readRootDSE( ResultHandler handler, P p) throws UnsupportedOperationException, IllegalStateException { if (connection == null) { throw new IllegalStateException(); } return connection.readRootDSE(handler, p); } /** * {@inheritDoc} */ public

ResultFuture readSchemaForEntry(DN name, ResultHandler handler, P p) throws UnsupportedOperationException, IllegalStateException { if (connection == null) { throw new IllegalStateException(); } return connection.readSchemaForEntry(name, handler, p); } /** * {@inheritDoc} */ public

ResultFuture readSchema(DN name, ResultHandler handler, P p) throws UnsupportedOperationException, IllegalStateException { if (connection == null) { throw new IllegalStateException(); } return connection.readSchema(name, handler, p); } public void addConnectionEventListener( ConnectionEventListener listener) throws IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } } public void removeConnectionEventListener( ConnectionEventListener listener) throws NullPointerException { if (connection == null) { throw new IllegalStateException(); } } /** * {@inheritDoc} */ public boolean isClosed() { return connection == null; } public void connectionReceivedUnsolicitedNotification( GenericExtendedResult notification) { // Ignore } public void connectionErrorOccurred( boolean isDisconnectNotification, ErrorResultException error) { synchronized (lock) { // Remove this connection from the pool if its in there pool.remove(this); numConnections--; connection.removeConnectionEventListener(this); // FIXME: should still close the connection, but we need to be // careful that users of the pooled connection get a sensible // error if they continue to use it (i.e. not an NPE or ISE). if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection error occured: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } } } } private static final class CompletedConnectionFuture implements ConnectionFuture { private final PooledConnectionWapper connection; private CompletedConnectionFuture(PooledConnectionWapper connection) { this.connection = connection; } public boolean cancel(boolean mayInterruptIfRunning) { return false; } public AsynchronousConnection get() throws InterruptedException, ErrorResultException { return connection; } public AsynchronousConnection get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException { return connection; } public boolean isCancelled() { return false; } public boolean isDone() { return true; } } private final class PendingConnectionFuture

implements ConnectionFuture { private volatile boolean isCancelled; private volatile PooledConnectionWapper connection; private volatile ErrorResultException err; private final ConnectionResultHandler handler; private final P p; private final CountDownLatch latch = new CountDownLatch(1); private PendingConnectionFuture( P p, ConnectionResultHandler handler) { this.handler = handler; this.p = p; } public synchronized boolean cancel(boolean mayInterruptIfRunning) { return pendingFutures.remove(this) && (isCancelled = true); } public AsynchronousConnection get() throws InterruptedException, ErrorResultException { latch.await(); if (err != null) { throw err; } return connection; } public AsynchronousConnection get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException { latch.await(timeout, unit); if (err != null) { throw err; } return connection; } public synchronized boolean isCancelled() { return isCancelled; } public boolean isDone() { return latch.getCount() == 0; } private void connection(PooledConnectionWapper connection) { this.connection = connection; if (handler != null) { handler.handleConnection(p, connection); } latch.countDown(); } private void error(ErrorResultException e) { this.err = e; if (handler != null) { handler.handleConnectionError(p, e); } latch.countDown(); } } /** * Creates a new connection pool which will maintain {@code poolSize} * connections created using the provided connection factory. * * @param connectionFactory * The connection factory to use for creating new * connections. * @param poolSize * The maximum size of the connection pool. */ public ConnectionPool(ConnectionFactory connectionFactory, int poolSize) { this.connectionFactory = connectionFactory; this.poolSize = poolSize; this.pool = new Stack(); this.pendingFutures = new ConcurrentLinkedQueue>(); } private final class WrapConnectionResultHandler implements ConnectionResultHandler { private final PendingConnectionFuture future; private WrapConnectionResultHandler( PendingConnectionFuture future) { this.future = future; } public void handleConnection(java.lang.Void p, AsynchronousConnection connection) { PooledConnectionWapper pooledConnection = new PooledConnectionWapper( connection); future.connection(pooledConnection); } public void handleConnectionError(java.lang.Void p, ErrorResultException error) { future.error(error); } } public

ConnectionFuture getAsynchronousConnection( ConnectionResultHandler handler, P p) { synchronized (lock) { // Check to see if we have a connection in the pool if (!pool.isEmpty()) { AsynchronousConnection conn = pool.pop(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection aquired from pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } PooledConnectionWapper pooledConnection = new PooledConnectionWapper( conn); if (handler != null) { handler.handleConnection(p, pooledConnection); } return new CompletedConnectionFuture(pooledConnection); } PendingConnectionFuture

pendingFuture = new PendingConnectionFuture

( p, handler); // Pool was empty. Maybe a new connection if pool size is not // reached if (numConnections < poolSize) { numConnections++; WrapConnectionResultHandler wrapHandler = new WrapConnectionResultHandler( pendingFuture); connectionFactory.getAsynchronousConnection(wrapHandler, null); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "New connection established and aquired. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } } else { // Have to wait pendingFutures.add(pendingFuture); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "No connections available. Wait-listed" + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } } return pendingFuture; } } }