/*
|
* CDDL HEADER START
|
*
|
* The contents of this file are subject to the terms of the
|
* Common Development and Distribution License, Version 1.0 only
|
* (the "License"). You may not use this file except in compliance
|
* with the License.
|
*
|
* You can obtain a copy of the license at
|
* trunk/opendj3/legal-notices/CDDLv1_0.txt
|
* or http://forgerock.org/license/CDDLv1.0.html.
|
* See the License for the specific language governing permissions
|
* and limitations under the License.
|
*
|
* When distributing Covered Code, include this CDDL HEADER in each
|
* file and include the License file at
|
* trunk/opendj3/legal-notices/CDDLv1_0.txt. If applicable,
|
* add the following below this CDDL HEADER, with the fields enclosed
|
* by brackets "[]" replaced with your own identifying information:
|
* Portions Copyright [yyyy] [name of copyright owner]
|
*
|
* CDDL HEADER END
|
*
|
*
|
* Copyright 2009-2010 Sun Microsystems, Inc.
|
* Portions copyright 2011 ForgeRock AS
|
*/
|
|
package org.forgerock.opendj.ldap;
|
|
|
|
import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
|
|
import java.util.Collection;
|
import java.util.LinkedList;
|
import java.util.List;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.logging.Level;
|
|
import org.forgerock.opendj.ldap.requests.*;
|
import org.forgerock.opendj.ldap.responses.*;
|
|
import com.forgerock.opendj.util.*;
|
|
|
|
/**
|
* A simple connection pool implementation which maintains a fixed number of
|
* connections.
|
*/
|
final class FixedConnectionPool extends AbstractConnectionFactory implements
|
ConnectionPool
|
{
|
|
/**
|
* This result handler is invoked when an attempt to add a new connection to
|
* the pool completes.
|
*/
|
private final class ConnectionResultHandler implements
|
ResultHandler<AsynchronousConnection>
|
{
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public void handleErrorResult(final ErrorResultException error)
|
{
|
// Connection attempt failed, so decrease the pool size.
|
currentPoolSize.release();
|
|
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
|
{
|
StaticUtils.DEBUG_LOG.fine(String.format("Connection attempt failed: "
|
+ error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize
|
- currentPoolSize.availablePermits(), poolSize));
|
}
|
|
QueueElement holder;
|
synchronized (queue)
|
{
|
if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
|
{
|
// No waiting futures.
|
return;
|
}
|
else
|
{
|
holder = queue.removeFirst();
|
}
|
}
|
|
// There was waiting future, so close it.
|
holder.getWaitingFuture().handleErrorResult(error);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public void handleResult(final AsynchronousConnection connection)
|
{
|
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
|
{
|
StaticUtils.DEBUG_LOG.fine(String.format(
|
"Connection attempt succeeded: "
|
+ " currentPoolSize=%d, poolSize=%d", poolSize
|
- currentPoolSize.availablePermits(), poolSize));
|
}
|
|
publishConnection(connection);
|
}
|
}
|
|
|
|
/**
|
* A pooled connection is passed to the client. It wraps an underlying
|
* "pooled" connection obtained from the underlying factory and lasts until
|
* the client application closes this connection. More specifically, pooled
|
* connections are not actually stored in the internal queue.
|
*/
|
private final class PooledConnection implements AsynchronousConnection
|
{
|
// Connection event listeners registed against this pooled connection should
|
// have the same life time as the pooled connection.
|
private final List<ConnectionEventListener> listeners =
|
new CopyOnWriteArrayList<ConnectionEventListener>();
|
|
private final AsynchronousConnection connection;
|
|
private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
|
|
|
PooledConnection(final AsynchronousConnection connection)
|
{
|
this.connection = connection;
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Void> abandon(final AbandonRequest request)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.abandon(request);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Result> add(final AddRequest request,
|
final ResultHandler<? super Result> handler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.add(request, handler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Result> add(final AddRequest request,
|
final ResultHandler<? super Result> resultHandler,
|
final IntermediateResponseHandler intermediateResponseHandler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection
|
.add(request, resultHandler, intermediateResponseHandler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public void addConnectionEventListener(
|
final ConnectionEventListener listener) throws IllegalStateException,
|
NullPointerException
|
{
|
Validator.ensureNotNull(listener);
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
listeners.add(listener);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<BindResult> bind(final BindRequest request,
|
final ResultHandler<? super BindResult> handler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.bind(request, handler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<BindResult> bind(final BindRequest request,
|
final ResultHandler<? super BindResult> resultHandler,
|
final IntermediateResponseHandler intermediateResponseHandler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.bind(request, resultHandler,
|
intermediateResponseHandler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public void close()
|
{
|
if (!isClosed.compareAndSet(false, true))
|
{
|
// Already closed.
|
return;
|
}
|
|
// Don't put invalid connections back in the pool.
|
if (connection.isValid())
|
{
|
publishConnection(connection);
|
}
|
else
|
{
|
// The connection may have been disconnected by the remote server, but
|
// the server may still be available. In order to avoid leaving pending
|
// futures hanging indefinitely, we should try to reconnect immediately.
|
|
// Close the dead connection.
|
connection.close();
|
|
// Try to get a new connection to replace it.
|
factory.getAsynchronousConnection(connectionResultHandler);
|
|
if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
|
{
|
StaticUtils.DEBUG_LOG.warning(String.format(
|
"Connection no longer valid. "
|
+ "currentPoolSize=%d, poolSize=%d", poolSize
|
- currentPoolSize.availablePermits(), poolSize));
|
}
|
}
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public void close(final UnbindRequest request, final String reason)
|
throws NullPointerException
|
{
|
close();
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<CompareResult> compare(final CompareRequest request,
|
final ResultHandler<? super CompareResult> handler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.compare(request, handler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<CompareResult> compare(final CompareRequest request,
|
final ResultHandler<? super CompareResult> resultHandler,
|
final IntermediateResponseHandler intermediateResponseHandler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.compare(request, resultHandler,
|
intermediateResponseHandler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Result> delete(final DeleteRequest request,
|
final ResultHandler<? super Result> handler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.delete(request, handler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Result> delete(final DeleteRequest request,
|
final ResultHandler<? super Result> resultHandler,
|
final IntermediateResponseHandler intermediateResponseHandler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.delete(request, resultHandler,
|
intermediateResponseHandler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public <R extends ExtendedResult> FutureResult<R> extendedRequest(
|
final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.extendedRequest(request, handler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public <R extends ExtendedResult> FutureResult<R> extendedRequest(
|
final ExtendedRequest<R> request,
|
final ResultHandler<? super R> resultHandler,
|
final IntermediateResponseHandler intermediateResponseHandler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.extendedRequest(request, resultHandler,
|
intermediateResponseHandler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public Connection getSynchronousConnection()
|
{
|
return new SynchronousConnection(this);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public boolean isClosed()
|
{
|
return isClosed.get();
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public boolean isValid()
|
{
|
return connection.isValid() && !isClosed();
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Result> modify(final ModifyRequest request,
|
final ResultHandler<? super Result> handler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.modify(request, handler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Result> modify(final ModifyRequest request,
|
final ResultHandler<? super Result> resultHandler,
|
final IntermediateResponseHandler intermediateResponseHandler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.modify(request, resultHandler,
|
intermediateResponseHandler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Result> modifyDN(final ModifyDNRequest request,
|
final ResultHandler<? super Result> handler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.modifyDN(request, handler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Result> modifyDN(final ModifyDNRequest request,
|
final ResultHandler<? super Result> resultHandler,
|
final IntermediateResponseHandler intermediateResponseHandler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.modifyDN(request, resultHandler,
|
intermediateResponseHandler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<SearchResultEntry> readEntry(final DN name,
|
final Collection<String> attributeDescriptions,
|
final ResultHandler<? super SearchResultEntry> resultHandler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.readEntry(name, attributeDescriptions, resultHandler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public void removeConnectionEventListener(
|
final ConnectionEventListener listener) throws NullPointerException
|
{
|
Validator.ensureNotNull(listener);
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
listeners.remove(listener);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Result> search(final SearchRequest request,
|
final SearchResultHandler handler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.search(request, handler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<Result> search(final SearchRequest request,
|
final SearchResultHandler resultHandler,
|
final IntermediateResponseHandler intermediateResponseHandler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.search(request, resultHandler,
|
intermediateResponseHandler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<SearchResultEntry> searchSingleEntry(
|
final SearchRequest request,
|
final ResultHandler<? super SearchResultEntry> resultHandler)
|
throws UnsupportedOperationException, IllegalStateException,
|
NullPointerException
|
{
|
if (isClosed())
|
{
|
throw new IllegalStateException();
|
}
|
return connection.searchSingleEntry(request, resultHandler);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public String toString()
|
{
|
final StringBuilder builder = new StringBuilder();
|
builder.append("PooledConnection(");
|
builder.append(connection);
|
builder.append(')');
|
return builder.toString();
|
}
|
}
|
|
|
|
/**
|
* A queue element is either a pending connection request future awaiting an
|
* {@code AsynchronousConnection} or it is an unused
|
* {@code AsynchronousConnection} awaiting a connection request.
|
*/
|
private static final class QueueElement
|
{
|
private final Object value;
|
|
|
|
QueueElement(final AsynchronousConnection connection)
|
{
|
this.value = connection;
|
}
|
|
|
|
QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
|
{
|
this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
|
}
|
|
|
|
@Override
|
public String toString()
|
{
|
return String.valueOf(value);
|
}
|
|
|
|
AsynchronousConnection getWaitingConnection()
|
{
|
if (value instanceof AsynchronousConnection)
|
{
|
return (AsynchronousConnection) value;
|
}
|
else
|
{
|
throw new IllegalStateException();
|
}
|
}
|
|
|
|
@SuppressWarnings("unchecked")
|
AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
|
{
|
if (value instanceof AsynchronousFutureResult)
|
{
|
return (AsynchronousFutureResult<AsynchronousConnection>) value;
|
}
|
else
|
{
|
throw new IllegalStateException();
|
}
|
}
|
|
|
|
boolean isWaitingFuture()
|
{
|
return value instanceof AsynchronousFutureResult;
|
}
|
}
|
|
|
|
// Guarded by queue.
|
private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
|
|
// Guarded by queue.
|
private boolean isClosed = false;
|
|
private final ConnectionFactory factory;
|
|
private final int poolSize;
|
|
private final Semaphore currentPoolSize;
|
|
private final ResultHandler<AsynchronousConnection> connectionResultHandler =
|
new ConnectionResultHandler();
|
|
|
|
/**
|
* Creates a new connection pool which will maintain {@code poolSize}
|
* connections created using the provided connection factory.
|
*
|
* @param factory
|
* The connection factory to use for creating new connections.
|
* @param poolSize
|
* The maximum size of the connection pool.
|
*/
|
FixedConnectionPool(final ConnectionFactory factory, final int poolSize)
|
{
|
this.factory = factory;
|
this.poolSize = poolSize;
|
this.currentPoolSize = new Semaphore(poolSize);
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public void close()
|
{
|
final LinkedList<AsynchronousConnection> idleConnections;
|
synchronized (queue)
|
{
|
if (isClosed)
|
{
|
return;
|
}
|
isClosed = true;
|
|
// Remove any connections which are waiting in the queue as these can be
|
// closed immediately.
|
idleConnections = new LinkedList<AsynchronousConnection>();
|
while (!queue.isEmpty() && !queue.getFirst().isWaitingFuture())
|
{
|
final QueueElement holder = queue.removeFirst();
|
idleConnections.add(holder.getWaitingConnection());
|
}
|
}
|
|
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
|
{
|
StaticUtils.DEBUG_LOG.fine(String.format(
|
"Connection pool is closing: currentPoolSize=%d, poolSize=%d",
|
poolSize - currentPoolSize.availablePermits(), poolSize));
|
}
|
|
// Close the idle connections.
|
for (final AsynchronousConnection connection : idleConnections)
|
{
|
closeConnection(connection);
|
}
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public FutureResult<AsynchronousConnection> getAsynchronousConnection(
|
final ResultHandler<? super AsynchronousConnection> handler)
|
{
|
QueueElement holder;
|
synchronized (queue)
|
{
|
if (isClosed)
|
{
|
throw new IllegalStateException("FixedConnectionPool is already closed");
|
}
|
|
if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
|
{
|
holder = new QueueElement(handler);
|
queue.add(holder);
|
}
|
else
|
{
|
holder = queue.removeFirst();
|
}
|
}
|
|
if (!holder.isWaitingFuture())
|
{
|
// There was a completed connection attempt.
|
final AsynchronousConnection connection = holder.getWaitingConnection();
|
final PooledConnection pooledConnection = new PooledConnection(connection);
|
if (handler != null)
|
{
|
handler.handleResult(pooledConnection);
|
}
|
return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
|
}
|
else
|
{
|
// Grow the pool if needed.
|
final FutureResult<AsynchronousConnection> future = holder
|
.getWaitingFuture();
|
if (!future.isDone() && currentPoolSize.tryAcquire())
|
{
|
factory.getAsynchronousConnection(connectionResultHandler);
|
}
|
return future;
|
}
|
}
|
|
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public String toString()
|
{
|
final StringBuilder builder = new StringBuilder();
|
builder.append("FixedConnectionPool(");
|
builder.append(String.valueOf(factory));
|
builder.append(',');
|
builder.append(poolSize);
|
builder.append(')');
|
return builder.toString();
|
}
|
|
|
|
/**
|
* Provide a finalizer because connection pools are expensive resources to
|
* accidentally leave around. Also, since they won't be created all that
|
* frequently, there's little risk of overloading the finalizer.
|
*/
|
@Override
|
protected void finalize() throws Throwable
|
{
|
close();
|
}
|
|
|
|
private void closeConnection(final AsynchronousConnection connection)
|
{
|
// The connection will be closed, so decrease the pool size.
|
currentPoolSize.release();
|
connection.close();
|
|
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
|
{
|
StaticUtils.DEBUG_LOG.fine(String.format(
|
"Closing connection because connection pool is closing: "
|
+ " currentPoolSize=%d, poolSize=%d",
|
poolSize - currentPoolSize.availablePermits(), poolSize));
|
}
|
}
|
|
|
|
private void publishConnection(final AsynchronousConnection connection)
|
{
|
final QueueElement holder;
|
boolean connectionPoolIsClosing = false;
|
|
synchronized (queue)
|
{
|
if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
|
{
|
if (isClosed)
|
{
|
connectionPoolIsClosing = true;
|
holder = null;
|
}
|
else
|
{
|
holder = new QueueElement(connection);
|
queue.add(holder);
|
return;
|
}
|
}
|
else
|
{
|
connectionPoolIsClosing = isClosed;
|
holder = queue.removeFirst();
|
}
|
}
|
|
// There was waiting future, so complete it.
|
if (connectionPoolIsClosing)
|
{
|
closeConnection(connection);
|
|
if (holder != null)
|
{
|
final ErrorResultException e = ErrorResultException.newErrorResult(
|
ResultCode.CLIENT_SIDE_USER_CANCELLED, ERR_CONNECTION_POOL_CLOSING
|
.get(toString()).toString());
|
holder.getWaitingFuture().handleErrorResult(e);
|
|
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
|
{
|
StaticUtils.DEBUG_LOG.fine(String.format(
|
"Connection attempt failed: " + e.getMessage()
|
+ " currentPoolSize=%d, poolSize=%d", poolSize
|
- currentPoolSize.availablePermits(), poolSize));
|
}
|
}
|
}
|
else
|
{
|
final PooledConnection pooledConnection = new PooledConnection(connection);
|
holder.getWaitingFuture().handleResult(pooledConnection);
|
}
|
}
|
}
|