/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opendj3/legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opendj3/legal-notices/CDDLv1_0.txt. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS */ package org.forgerock.opendj.ldap; import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import org.forgerock.opendj.ldap.requests.*; import org.forgerock.opendj.ldap.responses.*; import org.forgerock.opendj.ldif.ConnectionEntryReader; import com.forgerock.opendj.util.AsynchronousFutureResult; import com.forgerock.opendj.util.CompletedFutureResult; import com.forgerock.opendj.util.StaticUtils; import com.forgerock.opendj.util.Validator; /** * A simple connection pool implementation which maintains a fixed number of * connections. */ final class FixedConnectionPool implements ConnectionPool { /** * This result handler is invoked when an attempt to add a new connection to * the pool completes. */ private final class ConnectionResultHandler implements ResultHandler { /** * {@inheritDoc} */ @Override public void handleErrorResult(final ErrorResultException error) { // Connection attempt failed, so decrease the pool size. currentPoolSize.release(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format("Connection attempt failed: " + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } QueueElement holder; synchronized (queue) { if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) { // No waiting futures. return; } else { holder = queue.removeFirst(); } } // There was waiting future, so close it. holder.getWaitingFuture().handleErrorResult(error); } /** * {@inheritDoc} */ @Override public void handleResult(final Connection connection) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection attempt succeeded: " + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } publishConnection(connection); } } /** * A pooled connection is passed to the client. It wraps an underlying * "pooled" connection obtained from the underlying factory and lasts until * the client application closes this connection. More specifically, pooled * connections are not actually stored in the internal queue. */ private final class PooledConnection implements Connection { // Connection event listeners registed against this pooled connection should // have the same life time as the pooled connection. private final List listeners = new CopyOnWriteArrayList(); private final Connection connection; private final AtomicBoolean isClosed = new AtomicBoolean(false); PooledConnection(final Connection connection) { this.connection = connection; } @Override public FutureResult abandonAsync(final AbandonRequest request) { return checkState().abandonAsync(request); } @Override public Result add(final AddRequest request) throws ErrorResultException, InterruptedException { return checkState().add(request); } @Override public Result add(final Entry entry) throws ErrorResultException, InterruptedException { return checkState().add(entry); } @Override public Result add(final String... ldifLines) throws ErrorResultException, InterruptedException { return checkState().add(ldifLines); } @Override public FutureResult addAsync(final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { return checkState().addAsync(request, intermediateResponseHandler, resultHandler); } /** * {@inheritDoc} */ @Override public void addConnectionEventListener( final ConnectionEventListener listener) { Validator.ensureNotNull(listener); checkState(); listeners.add(listener); } @Override public BindResult bind(final BindRequest request) throws ErrorResultException, InterruptedException { return checkState().bind(request); } @Override public BindResult bind(final String name, final char[] password) throws ErrorResultException, InterruptedException { return checkState().bind(name, password); } @Override public FutureResult bindAsync(final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { return checkState().bindAsync(request, intermediateResponseHandler, resultHandler); } /** * {@inheritDoc} */ @Override public void close() { if (!isClosed.compareAndSet(false, true)) { // Already closed. return; } // Don't put invalid connections back in the pool. if (connection.isValid()) { publishConnection(connection); } else { // The connection may have been disconnected by the remote server, but // the server may still be available. In order to avoid leaving pending // futures hanging indefinitely, we should try to reconnect immediately. // Close the dead connection. connection.close(); // Try to get a new connection to replace it. factory.getConnectionAsync(connectionResultHandler); if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.warning(String.format( "Connection no longer valid. " + "currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } } } /** * {@inheritDoc} */ @Override public void close(final UnbindRequest request, final String reason) { close(); } @Override public CompareResult compare(final CompareRequest request) throws ErrorResultException, InterruptedException { return checkState().compare(request); } @Override public CompareResult compare(final String name, final String attributeDescription, final String assertionValue) throws ErrorResultException, InterruptedException { return checkState().compare(name, attributeDescription, assertionValue); } @Override public FutureResult compareAsync( final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { return checkState().compareAsync(request, intermediateResponseHandler, resultHandler); } @Override public Result delete(final DeleteRequest request) throws ErrorResultException, InterruptedException { return checkState().delete(request); } @Override public Result delete(final String name) throws ErrorResultException, InterruptedException { return checkState().delete(name); } @Override public FutureResult deleteAsync(final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { return checkState().deleteAsync(request, intermediateResponseHandler, resultHandler); } @Override public R extendedRequest( final ExtendedRequest request) throws ErrorResultException, InterruptedException { return checkState().extendedRequest(request); } @Override public R extendedRequest( final ExtendedRequest request, final IntermediateResponseHandler handler) throws ErrorResultException, InterruptedException { return checkState().extendedRequest(request, handler); } @Override public GenericExtendedResult extendedRequest(final String requestName, final ByteString requestValue) throws ErrorResultException, InterruptedException { return checkState().extendedRequest(requestName, requestValue); } @Override public FutureResult extendedRequestAsync( final ExtendedRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { return checkState().extendedRequestAsync(request, intermediateResponseHandler, resultHandler); } /** * {@inheritDoc} */ @Override public boolean isClosed() { return isClosed.get(); } /** * {@inheritDoc} */ @Override public boolean isValid() { return connection.isValid() && !isClosed(); } @Override public Result modify(final ModifyRequest request) throws ErrorResultException, InterruptedException { return checkState().modify(request); } @Override public Result modify(final String... ldifLines) throws ErrorResultException, InterruptedException { return checkState().modify(ldifLines); } @Override public FutureResult modifyAsync(final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { return checkState().modifyAsync(request, intermediateResponseHandler, resultHandler); } @Override public Result modifyDN(final ModifyDNRequest request) throws ErrorResultException, InterruptedException { return checkState().modifyDN(request); } @Override public Result modifyDN(final String name, final String newRDN) throws ErrorResultException, InterruptedException { return checkState().modifyDN(name, newRDN); } @Override public FutureResult modifyDNAsync(final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { return checkState().modifyDNAsync(request, intermediateResponseHandler, resultHandler); } @Override public SearchResultEntry readEntry(final DN name, final String... attributeDescriptions) throws ErrorResultException, InterruptedException { return checkState().readEntry(name, attributeDescriptions); } @Override public SearchResultEntry readEntry(final String name, final String... attributeDescriptions) throws ErrorResultException, InterruptedException { return checkState().readEntry(name, attributeDescriptions); } @Override public FutureResult readEntryAsync(final DN name, final Collection attributeDescriptions, final ResultHandler handler) { return checkState().readEntryAsync(name, attributeDescriptions, handler); } /** * {@inheritDoc} */ @Override public void removeConnectionEventListener( final ConnectionEventListener listener) { Validator.ensureNotNull(listener); checkState(); listeners.remove(listener); } @Override public ConnectionEntryReader search(final SearchRequest request) { return checkState().search(request); } @Override public Result search(final SearchRequest request, final Collection entries) throws ErrorResultException, InterruptedException { return checkState().search(request, entries); } @Override public Result search(final SearchRequest request, final Collection entries, final Collection references) throws ErrorResultException, InterruptedException { return checkState().search(request, entries, references); } @Override public Result search(final SearchRequest request, final SearchResultHandler handler) throws ErrorResultException, InterruptedException { return checkState().search(request, handler); } @Override public ConnectionEntryReader search(final String baseObject, final SearchScope scope, final String filter, final String... attributeDescriptions) { return checkState().search(baseObject, scope, filter, attributeDescriptions); } @Override public FutureResult searchAsync(final SearchRequest request, final IntermediateResponseHandler intermediateResponseHandler, final SearchResultHandler resultHandler) { return checkState().searchAsync(request, intermediateResponseHandler, resultHandler); } @Override public SearchResultEntry searchSingleEntry(final SearchRequest request) throws ErrorResultException, InterruptedException { return checkState().searchSingleEntry(request); } @Override public SearchResultEntry searchSingleEntry(final String baseObject, final SearchScope scope, final String filter, final String... attributeDescriptions) throws ErrorResultException, InterruptedException { return checkState().searchSingleEntry(baseObject, scope, filter, attributeDescriptions); } @Override public FutureResult searchSingleEntryAsync( final SearchRequest request, final ResultHandler handler) { return checkState().searchSingleEntryAsync(request, handler); } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("PooledConnection("); builder.append(connection); builder.append(')'); return builder.toString(); } // Checks that this pooled connection has not been closed. private Connection checkState() { if (isClosed()) { throw new IllegalStateException(); } return connection; } } /** * A queue element is either a pending connection request future awaiting an * {@code Connection} or it is an unused {@code Connection} awaiting a * connection request. */ private static final class QueueElement { private final Object value; QueueElement(final Connection connection) { this.value = connection; } QueueElement(final ResultHandler handler) { this.value = new AsynchronousFutureResult(handler); } @Override public String toString() { return String.valueOf(value); } Connection getWaitingConnection() { if (value instanceof Connection) { return (Connection) value; } else { throw new IllegalStateException(); } } @SuppressWarnings("unchecked") AsynchronousFutureResult getWaitingFuture() { if (value instanceof AsynchronousFutureResult) { return (AsynchronousFutureResult) value; } else { throw new IllegalStateException(); } } boolean isWaitingFuture() { return value instanceof AsynchronousFutureResult; } } // Guarded by queue. private final LinkedList queue = new LinkedList(); // Guarded by queue. private boolean isClosed = false; private final ConnectionFactory factory; private final int poolSize; private final Semaphore currentPoolSize; private final ResultHandler connectionResultHandler = new ConnectionResultHandler(); /** * Creates a new connection pool which will maintain {@code poolSize} * connections created using the provided connection factory. * * @param factory * The connection factory to use for creating new connections. * @param poolSize * The maximum size of the connection pool. */ FixedConnectionPool(final ConnectionFactory factory, final int poolSize) { this.factory = factory; this.poolSize = poolSize; this.currentPoolSize = new Semaphore(poolSize); } /** * {@inheritDoc} */ @Override public void close() { final LinkedList idleConnections; synchronized (queue) { if (isClosed) { return; } isClosed = true; // Remove any connections which are waiting in the queue as these can be // closed immediately. idleConnections = new LinkedList(); while (!queue.isEmpty() && !queue.getFirst().isWaitingFuture()) { final QueueElement holder = queue.removeFirst(); idleConnections.add(holder.getWaitingConnection()); } } if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection pool is closing: currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } // Close the idle connections. for (final Connection connection : idleConnections) { closeConnection(connection); } } /** * {@inheritDoc} */ @Override public Connection getConnection() throws ErrorResultException, InterruptedException { return getConnectionAsync(null).get(); } /** * {@inheritDoc} */ @Override public FutureResult getConnectionAsync( final ResultHandler handler) { QueueElement holder; synchronized (queue) { if (isClosed) { throw new IllegalStateException("FixedConnectionPool is already closed"); } if (queue.isEmpty() || queue.getFirst().isWaitingFuture()) { holder = new QueueElement(handler); queue.add(holder); } else { holder = queue.removeFirst(); } } if (!holder.isWaitingFuture()) { // There was a completed connection attempt. final Connection connection = holder.getWaitingConnection(); final PooledConnection pooledConnection = new PooledConnection(connection); if (handler != null) { handler.handleResult(pooledConnection); } return new CompletedFutureResult(pooledConnection); } else { // Grow the pool if needed. final FutureResult future = holder.getWaitingFuture(); if (!future.isDone() && currentPoolSize.tryAcquire()) { factory.getConnectionAsync(connectionResultHandler); } return future; } } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("FixedConnectionPool("); builder.append(String.valueOf(factory)); builder.append(','); builder.append(poolSize); builder.append(')'); return builder.toString(); } /** * Provide a finalizer because connection pools are expensive resources to * accidentally leave around. Also, since they won't be created all that * frequently, there's little risk of overloading the finalizer. */ @Override protected void finalize() throws Throwable { close(); } private void closeConnection(final Connection connection) { // The connection will be closed, so decrease the pool size. currentPoolSize.release(); connection.close(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Closing connection because connection pool is closing: " + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } } private void publishConnection(final Connection connection) { final QueueElement holder; boolean connectionPoolIsClosing = false; synchronized (queue) { if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) { if (isClosed) { connectionPoolIsClosing = true; holder = null; } else { holder = new QueueElement(connection); queue.add(holder); return; } } else { connectionPoolIsClosing = isClosed; holder = queue.removeFirst(); } } // There was waiting future, so complete it. if (connectionPoolIsClosing) { closeConnection(connection); if (holder != null) { final ErrorResultException e = ErrorResultException.newErrorResult( ResultCode.CLIENT_SIDE_USER_CANCELLED, ERR_CONNECTION_POOL_CLOSING .get(toString()).toString()); holder.getWaitingFuture().handleErrorResult(e); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection attempt failed: " + e.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } } } else { final PooledConnection pooledConnection = new PooledConnection(connection); holder.getWaitingFuture().handleResult(pooledConnection); } } }