/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. */ package org.opends.sdk; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import org.opends.sdk.requests.*; import org.opends.sdk.responses.*; import org.opends.sdk.schema.Schema; import com.sun.opends.sdk.util.AsynchronousFutureResult; import com.sun.opends.sdk.util.CompletedFutureResult; import com.sun.opends.sdk.util.StaticUtils; import com.sun.opends.sdk.util.Validator; /** * A simple connection pool implementation. */ final class ConnectionPool extends AbstractConnectionFactory { /** * This result handler is invoked when an attempt to add a new connection to * the pool completes. */ private final class ConnectionResultHandler implements ResultHandler { /** * {@inheritDoc} */ @Override public void handleErrorResult(final ErrorResultException error) { // Connection attempt failed, so decrease the pool size. currentPoolSize.release(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection attempt failed: " + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } QueueElement holder; synchronized (queue) { if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) { // No waiting futures. return; } else { holder = queue.removeFirst(); } } // There was waiting future, so close it. holder.getWaitingFuture().handleErrorResult(error); } /** * {@inheritDoc} */ @Override public void handleResult(final AsynchronousConnection connection) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection attempt succeeded: " + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } publishConnection(connection); } } /** * A pooled connection is passed to the client. It wraps an underlying * "pooled" connection obtained from the underlying factory and lasts until * the client application closes this connection. More specifically, pooled * connections are not actually stored in the internal queue. */ private final class PooledConnection implements AsynchronousConnection { // Connection event listeners registed against this pooled connection should // have the same life time as the pooled connection. private final List listeners = new CopyOnWriteArrayList(); private final AsynchronousConnection connection; private final AtomicBoolean isClosed = new AtomicBoolean(false); PooledConnection(final AsynchronousConnection connection) { this.connection = connection; } /** * {@inheritDoc} */ @Override public FutureResult abandon(final AbandonRequest request) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.abandon(request); } /** * {@inheritDoc} */ @Override public FutureResult add(final AddRequest request, final ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.add(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult add(final AddRequest request, final ResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection .add(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public void addConnectionEventListener( final ConnectionEventListener listener) throws IllegalStateException, NullPointerException { Validator.ensureNotNull(listener); if (isClosed()) { throw new IllegalStateException(); } listeners.add(listener); } /** * {@inheritDoc} */ @Override public FutureResult bind(final BindRequest request, final ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.bind(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult bind(final BindRequest request, final ResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.bind(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public void close() { if (!isClosed.compareAndSet(false, true)) { // Already closed. return; } // Don't put invalid connections back in the pool. if (connection.isValid()) { publishConnection(connection); } else { // The connection may have been disconnected by the remote server, but // the server may still be available. In order to avoid leaving pending // futures hanging indefinitely, we should try to reconnect immediately. // Close the dead connection. connection.close(); // Try to get a new connection to replace it. factory.getAsynchronousConnection(connectionResultHandler); if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.warning(String.format( "Connection no longer valid. " + "currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } } } /** * {@inheritDoc} */ @Override public void close(final UnbindRequest request, final String reason) throws NullPointerException { close(); } /** * {@inheritDoc} */ @Override public FutureResult compare(final CompareRequest request, final ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.compare(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult compare(final CompareRequest request, final ResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.compare(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult delete(final DeleteRequest request, final ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.delete(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult delete(final DeleteRequest request, final ResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.delete(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult extendedRequest( final ExtendedRequest request, final ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.extendedRequest(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult extendedRequest( final ExtendedRequest request, final ResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.extendedRequest(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public Connection getSynchronousConnection() { return new SynchronousConnection(this); } /** * {@inheritDoc} */ @Override public boolean isClosed() { return isClosed.get(); } /** * {@inheritDoc} */ @Override public boolean isValid() { return connection.isValid() && !isClosed(); } /** * {@inheritDoc} */ @Override public FutureResult modify(final ModifyRequest request, final ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modify(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult modify(final ModifyRequest request, final ResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modify(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult modifyDN(final ModifyDNRequest request, final ResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modifyDN(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult modifyDN(final ModifyDNRequest request, final ResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.modifyDN(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult readEntry(final DN name, final Collection attributeDescriptions, final ResultHandler resultHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.readEntry(name, attributeDescriptions, resultHandler); } /** * {@inheritDoc} */ @Override public FutureResult readRootDSE( final ResultHandler handler) throws UnsupportedOperationException, IllegalStateException { if (isClosed()) { throw new IllegalStateException(); } return connection.readRootDSE(handler); } /** * {@inheritDoc} */ @Override public FutureResult readSchema(final DN name, final ResultHandler handler) throws UnsupportedOperationException, IllegalStateException { if (isClosed()) { throw new IllegalStateException(); } return connection.readSchema(name, handler); } /** * {@inheritDoc} */ @Override public FutureResult readSchemaForEntry(final DN name, final ResultHandler handler) throws UnsupportedOperationException, IllegalStateException { if (isClosed()) { throw new IllegalStateException(); } return connection.readSchemaForEntry(name, handler); } /** * {@inheritDoc} */ @Override public void removeConnectionEventListener( final ConnectionEventListener listener) throws NullPointerException { Validator.ensureNotNull(listener); if (isClosed()) { throw new IllegalStateException(); } listeners.remove(listener); } /** * {@inheritDoc} */ @Override public FutureResult search(final SearchRequest request, final SearchResultHandler handler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.search(request, handler); } /** * {@inheritDoc} */ @Override public FutureResult search(final SearchRequest request, final SearchResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.search(request, resultHandler, intermediateResponseHandler); } /** * {@inheritDoc} */ @Override public FutureResult searchSingleEntry( final SearchRequest request, final ResultHandler resultHandler) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (isClosed()) { throw new IllegalStateException(); } return connection.searchSingleEntry(request, resultHandler); } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("PooledConnection("); builder.append(connection); builder.append(')'); return builder.toString(); } } /** * A queue element is either a pending connection request future awaiting an * {@code AsynchronousConnection} or it is an unused * {@code AsynchronousConnection} awaiting a connection request. */ private static final class QueueElement { private final Object value; QueueElement(final AsynchronousConnection connection) { this.value = connection; } QueueElement(final ResultHandler handler) { this.value = new AsynchronousFutureResult(handler); } AsynchronousConnection getWaitingConnection() { if (value instanceof AsynchronousConnection) { return (AsynchronousConnection) value; } else { throw new IllegalStateException(); } } @SuppressWarnings("unchecked") AsynchronousFutureResult getWaitingFuture() { if (value instanceof AsynchronousFutureResult) { return (AsynchronousFutureResult) value; } else { throw new IllegalStateException(); } } boolean isWaitingFuture() { return value instanceof AsynchronousFutureResult; } public String toString() { return String.valueOf(value); } } // Guarded by queue. private final LinkedList queue = new LinkedList(); private final ConnectionFactory factory; private final int poolSize; private final Semaphore currentPoolSize; private final ResultHandler connectionResultHandler = new ConnectionResultHandler(); /** * Creates a new connection pool which will maintain {@code poolSize} * connections created using the provided connection factory. * * @param factory * The connection factory to use for creating new connections. * @param poolSize * The maximum size of the connection pool. */ ConnectionPool(final ConnectionFactory factory, final int poolSize) { this.factory = factory; this.poolSize = poolSize; this.currentPoolSize = new Semaphore(poolSize); } /** * {@inheritDoc} */ @Override public FutureResult getAsynchronousConnection( final ResultHandler handler) { QueueElement holder; synchronized (queue) { if (queue.isEmpty() || queue.getFirst().isWaitingFuture()) { holder = new QueueElement(handler); queue.add(holder); } else { holder = queue.removeFirst(); } } if (!holder.isWaitingFuture()) { // There was a completed connection attempt. final AsynchronousConnection connection = holder.getWaitingConnection(); final PooledConnection pooledConnection = new PooledConnection(connection); if (handler != null) { handler.handleResult(pooledConnection); } return new CompletedFutureResult(pooledConnection); } else { // Grow the pool if needed. final FutureResult future = holder .getWaitingFuture(); if (!future.isDone() && currentPoolSize.tryAcquire()) { factory.getAsynchronousConnection(connectionResultHandler); } return future; } } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("ConnectionPool("); builder.append(String.valueOf(factory)); builder.append(','); builder.append(poolSize); builder.append(')'); return builder.toString(); } private void publishConnection(final AsynchronousConnection connection) { QueueElement holder; synchronized (queue) { if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) { holder = new QueueElement(connection); queue.add(holder); return; } else { holder = queue.removeFirst(); } } // There was waiting future, so close it. final PooledConnection pooledConnection = new PooledConnection(connection); holder.getWaitingFuture().handleResult(pooledConnection); } }