From 263d085885df024dca9250cc03c807912b0a7662 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 24 Apr 2012 22:33:21 +0000
Subject: [PATCH] Reformat to comply with new Checkstyle rules.
---
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java | 1477 ++++++++++++++++++++++++----------------------------------
1 files changed, 613 insertions(+), 864 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
index aff47bd..04227a7 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -6,17 +6,16 @@
* (the "License"). You may not use this file except in compliance
* with the License.
*
- * You can obtain a copy of the license at
- * trunk/opendj3/legal-notices/CDDLv1_0.txt
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* or http://forgerock.org/license/CDDLv1.0.html.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opendj3/legal-notices/CDDLv1_0.txt. If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
@@ -28,8 +27,6 @@
package org.forgerock.opendj.ldap;
-
-
import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
import java.util.Collection;
@@ -40,8 +37,23 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
-import org.forgerock.opendj.ldap.requests.*;
-import org.forgerock.opendj.ldap.responses.*;
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.forgerock.opendj.ldap.responses.GenericExtendedResult;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.opendj.ldap.responses.SearchResultEntry;
+import org.forgerock.opendj.ldap.responses.SearchResultReference;
import org.forgerock.opendj.ldif.ConnectionEntryReader;
import com.forgerock.opendj.util.AsynchronousFutureResult;
@@ -49,902 +61,639 @@
import com.forgerock.opendj.util.StaticUtils;
import com.forgerock.opendj.util.Validator;
-
-
/**
* A simple connection pool implementation which maintains a fixed number of
* connections.
*/
-final class FixedConnectionPool implements ConnectionPool
-{
+final class FixedConnectionPool implements ConnectionPool {
- /**
- * This result handler is invoked when an attempt to add a new connection to
- * the pool completes.
- */
- private final class ConnectionResultHandler implements
- ResultHandler<Connection>
- {
/**
- * {@inheritDoc}
+ * This result handler is invoked when an attempt to add a new connection to
+ * the pool completes.
*/
- @Override
- public void handleErrorResult(final ErrorResultException error)
- {
- // Connection attempt failed, so decrease the pool size.
- currentPoolSize.release();
+ private final class ConnectionResultHandler implements ResultHandler<Connection> {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleErrorResult(final ErrorResultException error) {
+ // Connection attempt failed, so decrease the pool size.
+ currentPoolSize.release();
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.fine(String.format("Connection attempt failed: "
- + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
- }
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
+ StaticUtils.DEBUG_LOG.fine(String.format("Connection attempt failed: "
+ + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
+ }
- QueueElement holder;
- synchronized (queue)
- {
- if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
- {
- // No waiting futures.
- return;
+ QueueElement holder;
+ synchronized (queue) {
+ if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) {
+ // No waiting futures.
+ return;
+ } else {
+ holder = queue.removeFirst();
+ }
+ }
+
+ // There was waiting future, so close it.
+ holder.getWaitingFuture().handleErrorResult(error);
}
- else
- {
- holder = queue.removeFirst();
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleResult(final Connection connection) {
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
+ StaticUtils.DEBUG_LOG.fine(String.format("Connection attempt succeeded: "
+ + " currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
+ }
+
+ publishConnection(connection);
}
- }
-
- // There was waiting future, so close it.
- holder.getWaitingFuture().handleErrorResult(error);
}
+ /**
+ * A pooled connection is passed to the client. It wraps an underlying
+ * "pooled" connection obtained from the underlying factory and lasts until
+ * the client application closes this connection. More specifically, pooled
+ * connections are not actually stored in the internal queue.
+ */
+ private final class PooledConnection implements Connection {
+ // Connection event listeners registed against this pooled connection
+ // should
+ // have the same life time as the pooled connection.
+ private final List<ConnectionEventListener> listeners =
+ new CopyOnWriteArrayList<ConnectionEventListener>();
+ private final Connection connection;
+
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ PooledConnection(final Connection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public FutureResult<Void> abandonAsync(final AbandonRequest request) {
+ return checkState().abandonAsync(request);
+ }
+
+ @Override
+ public Result add(final AddRequest request) throws ErrorResultException,
+ InterruptedException {
+ return checkState().add(request);
+ }
+
+ @Override
+ public Result add(final Entry entry) throws ErrorResultException, InterruptedException {
+ return checkState().add(entry);
+ }
+
+ @Override
+ public Result add(final String... ldifLines) throws ErrorResultException,
+ InterruptedException {
+ return checkState().add(ldifLines);
+ }
+
+ @Override
+ public FutureResult<Result> addAsync(final AddRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ return checkState().addAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addConnectionEventListener(final ConnectionEventListener listener) {
+ Validator.ensureNotNull(listener);
+ checkState();
+ listeners.add(listener);
+ }
+
+ @Override
+ public BindResult bind(final BindRequest request) throws ErrorResultException,
+ InterruptedException {
+ return checkState().bind(request);
+ }
+
+ @Override
+ public BindResult bind(final String name, final char[] password)
+ throws ErrorResultException, InterruptedException {
+ return checkState().bind(name, password);
+ }
+
+ @Override
+ public FutureResult<BindResult> bindAsync(final BindRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super BindResult> resultHandler) {
+ return checkState().bindAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ if (!isClosed.compareAndSet(false, true)) {
+ // Already closed.
+ return;
+ }
+
+ // Don't put invalid connections back in the pool.
+ if (connection.isValid()) {
+ publishConnection(connection);
+ } else {
+ // The connection may have been disconnected by the remote
+ // server, but
+ // the server may still be available. In order to avoid leaving
+ // pending
+ // futures hanging indefinitely, we should try to reconnect
+ // immediately.
+
+ // Close the dead connection.
+ connection.close();
+
+ // Try to get a new connection to replace it.
+ factory.getConnectionAsync(connectionResultHandler);
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) {
+ StaticUtils.DEBUG_LOG.warning(String.format("Connection no longer valid. "
+ + "currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close(final UnbindRequest request, final String reason) {
+ close();
+ }
+
+ @Override
+ public CompareResult compare(final CompareRequest request) throws ErrorResultException,
+ InterruptedException {
+ return checkState().compare(request);
+ }
+
+ @Override
+ public CompareResult compare(final String name, final String attributeDescription,
+ final String assertionValue) throws ErrorResultException, InterruptedException {
+ return checkState().compare(name, attributeDescription, assertionValue);
+ }
+
+ @Override
+ public FutureResult<CompareResult> compareAsync(final CompareRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super CompareResult> resultHandler) {
+ return checkState().compareAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public Result delete(final DeleteRequest request) throws ErrorResultException,
+ InterruptedException {
+ return checkState().delete(request);
+ }
+
+ @Override
+ public Result delete(final String name) throws ErrorResultException, InterruptedException {
+ return checkState().delete(name);
+ }
+
+ @Override
+ public FutureResult<Result> deleteAsync(final DeleteRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ return checkState().deleteAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request)
+ throws ErrorResultException, InterruptedException {
+ return checkState().extendedRequest(request);
+ }
+
+ @Override
+ public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request,
+ final IntermediateResponseHandler handler) throws ErrorResultException,
+ InterruptedException {
+ return checkState().extendedRequest(request, handler);
+ }
+
+ @Override
+ public GenericExtendedResult extendedRequest(final String requestName,
+ final ByteString requestValue) throws ErrorResultException, InterruptedException {
+ return checkState().extendedRequest(requestName, requestValue);
+ }
+
+ @Override
+ public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
+ final ExtendedRequest<R> request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super R> resultHandler) {
+ return checkState().extendedRequestAsync(request, intermediateResponseHandler,
+ resultHandler);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isClosed() {
+ return isClosed.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isValid() {
+ return connection.isValid() && !isClosed();
+ }
+
+ @Override
+ public Result modify(final ModifyRequest request) throws ErrorResultException,
+ InterruptedException {
+ return checkState().modify(request);
+ }
+
+ @Override
+ public Result modify(final String... ldifLines) throws ErrorResultException,
+ InterruptedException {
+ return checkState().modify(ldifLines);
+ }
+
+ @Override
+ public FutureResult<Result> modifyAsync(final ModifyRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ return checkState().modifyAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public Result modifyDN(final ModifyDNRequest request) throws ErrorResultException,
+ InterruptedException {
+ return checkState().modifyDN(request);
+ }
+
+ @Override
+ public Result modifyDN(final String name, final String newRDN) throws ErrorResultException,
+ InterruptedException {
+ return checkState().modifyDN(name, newRDN);
+ }
+
+ @Override
+ public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ return checkState().modifyDNAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public SearchResultEntry readEntry(final DN name, final String... attributeDescriptions)
+ throws ErrorResultException, InterruptedException {
+ return checkState().readEntry(name, attributeDescriptions);
+ }
+
+ @Override
+ public SearchResultEntry readEntry(final String name, final String... attributeDescriptions)
+ throws ErrorResultException, InterruptedException {
+ return checkState().readEntry(name, attributeDescriptions);
+ }
+
+ @Override
+ public FutureResult<SearchResultEntry> readEntryAsync(final DN name,
+ final Collection<String> attributeDescriptions,
+ final ResultHandler<? super SearchResultEntry> handler) {
+ return checkState().readEntryAsync(name, attributeDescriptions, handler);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void removeConnectionEventListener(final ConnectionEventListener listener) {
+ Validator.ensureNotNull(listener);
+ checkState();
+ listeners.remove(listener);
+ }
+
+ @Override
+ public ConnectionEntryReader search(final SearchRequest request) {
+ return checkState().search(request);
+ }
+
+ @Override
+ public Result search(final SearchRequest request,
+ final Collection<? super SearchResultEntry> entries) throws ErrorResultException,
+ InterruptedException {
+ return checkState().search(request, entries);
+ }
+
+ @Override
+ public Result search(final SearchRequest request,
+ final Collection<? super SearchResultEntry> entries,
+ final Collection<? super SearchResultReference> references)
+ throws ErrorResultException, InterruptedException {
+ return checkState().search(request, entries, references);
+ }
+
+ @Override
+ public Result search(final SearchRequest request, final SearchResultHandler handler)
+ throws ErrorResultException, InterruptedException {
+ return checkState().search(request, handler);
+ }
+
+ @Override
+ public ConnectionEntryReader search(final String baseObject, final SearchScope scope,
+ final String filter, final String... attributeDescriptions) {
+ return checkState().search(baseObject, scope, filter, attributeDescriptions);
+ }
+
+ @Override
+ public FutureResult<Result> searchAsync(final SearchRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final SearchResultHandler resultHandler) {
+ return checkState().searchAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public SearchResultEntry searchSingleEntry(final SearchRequest request)
+ throws ErrorResultException, InterruptedException {
+ return checkState().searchSingleEntry(request);
+ }
+
+ @Override
+ public SearchResultEntry searchSingleEntry(final String baseObject,
+ final SearchScope scope, final String filter, final String... attributeDescriptions)
+ throws ErrorResultException, InterruptedException {
+ return checkState().searchSingleEntry(baseObject, scope, filter, attributeDescriptions);
+ }
+
+ @Override
+ public FutureResult<SearchResultEntry> searchSingleEntryAsync(final SearchRequest request,
+ final ResultHandler<? super SearchResultEntry> handler) {
+ return checkState().searchSingleEntryAsync(request, handler);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("PooledConnection(");
+ builder.append(connection);
+ builder.append(')');
+ return builder.toString();
+ }
+
+ // Checks that this pooled connection has not been closed.
+ private Connection checkState() {
+ if (isClosed()) {
+ throw new IllegalStateException();
+ }
+ return connection;
+ }
+
+ }
+
+ /**
+ * A queue element is either a pending connection request future awaiting an
+ * {@code Connection} or it is an unused {@code Connection} awaiting a
+ * connection request.
+ */
+ private static final class QueueElement {
+ private final Object value;
+
+ QueueElement(final Connection connection) {
+ this.value = connection;
+ }
+
+ QueueElement(final ResultHandler<? super Connection> handler) {
+ this.value = new AsynchronousFutureResult<Connection>(handler);
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(value);
+ }
+
+ Connection getWaitingConnection() {
+ if (value instanceof Connection) {
+ return (Connection) value;
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ AsynchronousFutureResult<Connection> getWaitingFuture() {
+ if (value instanceof AsynchronousFutureResult) {
+ return (AsynchronousFutureResult<Connection>) value;
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+ boolean isWaitingFuture() {
+ return value instanceof AsynchronousFutureResult;
+ }
+ }
+
+ // Guarded by queue.
+ private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
+
+ // Guarded by queue.
+ private boolean isClosed = false;
+
+ private final ConnectionFactory factory;
+
+ private final int poolSize;
+
+ private final Semaphore currentPoolSize;
+
+ private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
+
+ /**
+ * Creates a new connection pool which will maintain {@code poolSize}
+ * connections created using the provided connection factory.
+ *
+ * @param factory
+ * The connection factory to use for creating new connections.
+ * @param poolSize
+ * The maximum size of the connection pool.
+ */
+ FixedConnectionPool(final ConnectionFactory factory, final int poolSize) {
+ this.factory = factory;
+ this.poolSize = poolSize;
+ this.currentPoolSize = new Semaphore(poolSize);
+ }
/**
* {@inheritDoc}
*/
@Override
- public void handleResult(final Connection connection)
- {
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.fine(String.format(
- "Connection attempt succeeded: "
- + " currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
- }
+ public void close() {
+ final LinkedList<Connection> idleConnections;
+ synchronized (queue) {
+ if (isClosed) {
+ return;
+ }
+ isClosed = true;
- publishConnection(connection);
+ // Remove any connections which are waiting in the queue as these
+ // can be
+ // closed immediately.
+ idleConnections = new LinkedList<Connection>();
+ while (!queue.isEmpty() && !queue.getFirst().isWaitingFuture()) {
+ final QueueElement holder = queue.removeFirst();
+ idleConnections.add(holder.getWaitingConnection());
+ }
+ }
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
+ StaticUtils.DEBUG_LOG.fine(String.format(
+ "Connection pool is closing: currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
+ }
+
+ // Close the idle connections.
+ for (final Connection connection : idleConnections) {
+ closeConnection(connection);
+ }
}
- }
-
-
-
- /**
- * A pooled connection is passed to the client. It wraps an underlying
- * "pooled" connection obtained from the underlying factory and lasts until
- * the client application closes this connection. More specifically, pooled
- * connections are not actually stored in the internal queue.
- */
- private final class PooledConnection implements Connection
- {
- // Connection event listeners registed against this pooled connection should
- // have the same life time as the pooled connection.
- private final List<ConnectionEventListener> listeners =
- new CopyOnWriteArrayList<ConnectionEventListener>();
-
- private final Connection connection;
-
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-
-
- PooledConnection(final Connection connection)
- {
- this.connection = connection;
- }
-
-
-
- @Override
- public FutureResult<Void> abandonAsync(final AbandonRequest request)
- {
- return checkState().abandonAsync(request);
- }
-
-
-
- @Override
- public Result add(final AddRequest request) throws ErrorResultException,
- InterruptedException
- {
- return checkState().add(request);
- }
-
-
-
- @Override
- public Result add(final Entry entry) throws ErrorResultException,
- InterruptedException
- {
- return checkState().add(entry);
- }
-
-
-
- @Override
- public Result add(final String... ldifLines) throws ErrorResultException,
- InterruptedException
- {
- return checkState().add(ldifLines);
- }
-
-
-
- @Override
- public FutureResult<Result> addAsync(final AddRequest request,
- final IntermediateResponseHandler intermediateResponseHandler,
- final ResultHandler<? super Result> resultHandler)
- {
- return checkState().addAsync(request, intermediateResponseHandler,
- resultHandler);
- }
-
-
/**
* {@inheritDoc}
*/
@Override
- public void addConnectionEventListener(
- final ConnectionEventListener listener)
- {
- Validator.ensureNotNull(listener);
- checkState();
- listeners.add(listener);
+ public Connection getConnection() throws ErrorResultException, InterruptedException {
+ return getConnectionAsync(null).get();
}
-
-
- @Override
- public BindResult bind(final BindRequest request)
- throws ErrorResultException, InterruptedException
- {
- return checkState().bind(request);
- }
-
-
-
- @Override
- public BindResult bind(final String name, final char[] password)
- throws ErrorResultException, InterruptedException
- {
- return checkState().bind(name, password);
- }
-
-
-
- @Override
- public FutureResult<BindResult> bindAsync(final BindRequest request,
- final IntermediateResponseHandler intermediateResponseHandler,
- final ResultHandler<? super BindResult> resultHandler)
- {
- return checkState().bindAsync(request, intermediateResponseHandler,
- resultHandler);
- }
-
-
-
/**
* {@inheritDoc}
*/
@Override
- public void close()
- {
- if (!isClosed.compareAndSet(false, true))
- {
- // Already closed.
- return;
- }
+ public FutureResult<Connection> getConnectionAsync(
+ final ResultHandler<? super Connection> handler) {
+ QueueElement holder;
+ synchronized (queue) {
+ if (isClosed) {
+ throw new IllegalStateException("FixedConnectionPool is already closed");
+ }
- // Don't put invalid connections back in the pool.
- if (connection.isValid())
- {
- publishConnection(connection);
- }
- else
- {
- // The connection may have been disconnected by the remote server, but
- // the server may still be available. In order to avoid leaving pending
- // futures hanging indefinitely, we should try to reconnect immediately.
+ if (queue.isEmpty() || queue.getFirst().isWaitingFuture()) {
+ holder = new QueueElement(handler);
+ queue.add(holder);
+ } else {
+ holder = queue.removeFirst();
+ }
+ }
- // Close the dead connection.
+ if (!holder.isWaitingFuture()) {
+ // There was a completed connection attempt.
+ final Connection connection = holder.getWaitingConnection();
+ final PooledConnection pooledConnection = new PooledConnection(connection);
+ if (handler != null) {
+ handler.handleResult(pooledConnection);
+ }
+ return new CompletedFutureResult<Connection>(pooledConnection);
+ } else {
+ // Grow the pool if needed.
+ final FutureResult<Connection> future = holder.getWaitingFuture();
+ if (!future.isDone() && currentPoolSize.tryAcquire()) {
+ factory.getConnectionAsync(connectionResultHandler);
+ }
+ return future;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("FixedConnectionPool(");
+ builder.append(String.valueOf(factory));
+ builder.append(',');
+ builder.append(poolSize);
+ builder.append(')');
+ return builder.toString();
+ }
+
+ /**
+ * Provide a finalizer because connection pools are expensive resources to
+ * accidentally leave around. Also, since they won't be created all that
+ * frequently, there's little risk of overloading the finalizer.
+ */
+ @Override
+ protected void finalize() throws Throwable {
+ close();
+ }
+
+ private void closeConnection(final Connection connection) {
+ // The connection will be closed, so decrease the pool size.
+ currentPoolSize.release();
connection.close();
- // Try to get a new connection to replace it.
- factory.getConnectionAsync(connectionResultHandler);
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG.warning(String.format(
- "Connection no longer valid. "
- + "currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
+ StaticUtils.DEBUG_LOG.fine(String.format(
+ "Closing connection because connection pool is closing: "
+ + " currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
}
- }
}
+ private void publishConnection(final Connection connection) {
+ final QueueElement holder;
+ boolean connectionPoolIsClosing = false;
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close(final UnbindRequest request, final String reason)
- {
- close();
- }
-
-
-
- @Override
- public CompareResult compare(final CompareRequest request)
- throws ErrorResultException, InterruptedException
- {
- return checkState().compare(request);
- }
-
-
-
- @Override
- public CompareResult compare(final String name,
- final String attributeDescription, final String assertionValue)
- throws ErrorResultException, InterruptedException
- {
- return checkState().compare(name, attributeDescription, assertionValue);
- }
-
-
-
- @Override
- public FutureResult<CompareResult> compareAsync(
- final CompareRequest request,
- final IntermediateResponseHandler intermediateResponseHandler,
- final ResultHandler<? super CompareResult> resultHandler)
- {
- return checkState().compareAsync(request, intermediateResponseHandler,
- resultHandler);
- }
-
-
-
- @Override
- public Result delete(final DeleteRequest request)
- throws ErrorResultException, InterruptedException
- {
- return checkState().delete(request);
- }
-
-
-
- @Override
- public Result delete(final String name) throws ErrorResultException,
- InterruptedException
- {
- return checkState().delete(name);
- }
-
-
-
- @Override
- public FutureResult<Result> deleteAsync(final DeleteRequest request,
- final IntermediateResponseHandler intermediateResponseHandler,
- final ResultHandler<? super Result> resultHandler)
- {
- return checkState().deleteAsync(request, intermediateResponseHandler,
- resultHandler);
- }
-
-
-
- @Override
- public <R extends ExtendedResult> R extendedRequest(
- final ExtendedRequest<R> request) throws ErrorResultException,
- InterruptedException
- {
- return checkState().extendedRequest(request);
- }
-
-
-
- @Override
- public <R extends ExtendedResult> R extendedRequest(
- final ExtendedRequest<R> request,
- final IntermediateResponseHandler handler) throws ErrorResultException,
- InterruptedException
- {
- return checkState().extendedRequest(request, handler);
- }
-
-
-
- @Override
- public GenericExtendedResult extendedRequest(final String requestName,
- final ByteString requestValue) throws ErrorResultException,
- InterruptedException
- {
- return checkState().extendedRequest(requestName, requestValue);
- }
-
-
-
- @Override
- public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
- final ExtendedRequest<R> request,
- final IntermediateResponseHandler intermediateResponseHandler,
- final ResultHandler<? super R> resultHandler)
- {
- return checkState().extendedRequestAsync(request,
- intermediateResponseHandler, resultHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isClosed()
- {
- return isClosed.get();
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isValid()
- {
- return connection.isValid() && !isClosed();
- }
-
-
-
- @Override
- public Result modify(final ModifyRequest request)
- throws ErrorResultException, InterruptedException
- {
- return checkState().modify(request);
- }
-
-
-
- @Override
- public Result modify(final String... ldifLines)
- throws ErrorResultException, InterruptedException
- {
- return checkState().modify(ldifLines);
- }
-
-
-
- @Override
- public FutureResult<Result> modifyAsync(final ModifyRequest request,
- final IntermediateResponseHandler intermediateResponseHandler,
- final ResultHandler<? super Result> resultHandler)
- {
- return checkState().modifyAsync(request, intermediateResponseHandler,
- resultHandler);
- }
-
-
-
- @Override
- public Result modifyDN(final ModifyDNRequest request)
- throws ErrorResultException, InterruptedException
- {
- return checkState().modifyDN(request);
- }
-
-
-
- @Override
- public Result modifyDN(final String name, final String newRDN)
- throws ErrorResultException, InterruptedException
- {
- return checkState().modifyDN(name, newRDN);
- }
-
-
-
- @Override
- public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
- final IntermediateResponseHandler intermediateResponseHandler,
- final ResultHandler<? super Result> resultHandler)
- {
- return checkState().modifyDNAsync(request, intermediateResponseHandler,
- resultHandler);
- }
-
-
-
- @Override
- public SearchResultEntry readEntry(final DN name,
- final String... attributeDescriptions) throws ErrorResultException,
- InterruptedException
- {
- return checkState().readEntry(name, attributeDescriptions);
- }
-
-
-
- @Override
- public SearchResultEntry readEntry(final String name,
- final String... attributeDescriptions) throws ErrorResultException,
- InterruptedException
- {
- return checkState().readEntry(name, attributeDescriptions);
- }
-
-
-
- @Override
- public FutureResult<SearchResultEntry> readEntryAsync(final DN name,
- final Collection<String> attributeDescriptions,
- final ResultHandler<? super SearchResultEntry> handler)
- {
- return checkState().readEntryAsync(name, attributeDescriptions, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void removeConnectionEventListener(
- final ConnectionEventListener listener)
- {
- Validator.ensureNotNull(listener);
- checkState();
- listeners.remove(listener);
- }
-
-
-
- @Override
- public ConnectionEntryReader search(final SearchRequest request)
- {
- return checkState().search(request);
- }
-
-
-
- @Override
- public Result search(final SearchRequest request,
- final Collection<? super SearchResultEntry> entries)
- throws ErrorResultException, InterruptedException
- {
- return checkState().search(request, entries);
- }
-
-
-
- @Override
- public Result search(final SearchRequest request,
- final Collection<? super SearchResultEntry> entries,
- final Collection<? super SearchResultReference> references)
- throws ErrorResultException, InterruptedException
- {
- return checkState().search(request, entries, references);
- }
-
-
-
- @Override
- public Result search(final SearchRequest request,
- final SearchResultHandler handler) throws ErrorResultException,
- InterruptedException
- {
- return checkState().search(request, handler);
- }
-
-
-
- @Override
- public ConnectionEntryReader search(final String baseObject,
- final SearchScope scope, final String filter,
- final String... attributeDescriptions)
- {
- return checkState().search(baseObject, scope, filter,
- attributeDescriptions);
- }
-
-
-
- @Override
- public FutureResult<Result> searchAsync(final SearchRequest request,
- final IntermediateResponseHandler intermediateResponseHandler,
- final SearchResultHandler resultHandler)
- {
- return checkState().searchAsync(request, intermediateResponseHandler,
- resultHandler);
- }
-
-
-
- @Override
- public SearchResultEntry searchSingleEntry(final SearchRequest request)
- throws ErrorResultException, InterruptedException
- {
- return checkState().searchSingleEntry(request);
- }
-
-
-
- @Override
- public SearchResultEntry searchSingleEntry(final String baseObject,
- final SearchScope scope, final String filter,
- final String... attributeDescriptions) throws ErrorResultException,
- InterruptedException
- {
- return checkState().searchSingleEntry(baseObject, scope, filter,
- attributeDescriptions);
- }
-
-
-
- @Override
- public FutureResult<SearchResultEntry> searchSingleEntryAsync(
- final SearchRequest request,
- final ResultHandler<? super SearchResultEntry> handler)
- {
- return checkState().searchSingleEntryAsync(request, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString()
- {
- final StringBuilder builder = new StringBuilder();
- builder.append("PooledConnection(");
- builder.append(connection);
- builder.append(')');
- return builder.toString();
- }
-
-
-
- // Checks that this pooled connection has not been closed.
- private Connection checkState()
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection;
- }
-
- }
-
-
-
- /**
- * A queue element is either a pending connection request future awaiting an
- * {@code Connection} or it is an unused {@code Connection} awaiting a
- * connection request.
- */
- private static final class QueueElement
- {
- private final Object value;
-
-
-
- QueueElement(final Connection connection)
- {
- this.value = connection;
- }
-
-
-
- QueueElement(final ResultHandler<? super Connection> handler)
- {
- this.value = new AsynchronousFutureResult<Connection>(handler);
- }
-
-
-
- @Override
- public String toString()
- {
- return String.valueOf(value);
- }
-
-
-
- Connection getWaitingConnection()
- {
- if (value instanceof Connection)
- {
- return (Connection) value;
- }
- else
- {
- throw new IllegalStateException();
- }
- }
-
-
-
- @SuppressWarnings("unchecked")
- AsynchronousFutureResult<Connection> getWaitingFuture()
- {
- if (value instanceof AsynchronousFutureResult)
- {
- return (AsynchronousFutureResult<Connection>) value;
- }
- else
- {
- throw new IllegalStateException();
- }
- }
-
-
-
- boolean isWaitingFuture()
- {
- return value instanceof AsynchronousFutureResult;
- }
- }
-
-
-
- // Guarded by queue.
- private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
-
- // Guarded by queue.
- private boolean isClosed = false;
-
- private final ConnectionFactory factory;
-
- private final int poolSize;
-
- private final Semaphore currentPoolSize;
-
- private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
-
-
-
- /**
- * Creates a new connection pool which will maintain {@code poolSize}
- * connections created using the provided connection factory.
- *
- * @param factory
- * The connection factory to use for creating new connections.
- * @param poolSize
- * The maximum size of the connection pool.
- */
- FixedConnectionPool(final ConnectionFactory factory, final int poolSize)
- {
- this.factory = factory;
- this.poolSize = poolSize;
- this.currentPoolSize = new Semaphore(poolSize);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close()
- {
- final LinkedList<Connection> idleConnections;
- synchronized (queue)
- {
- if (isClosed)
- {
- return;
- }
- isClosed = true;
-
- // Remove any connections which are waiting in the queue as these can be
- // closed immediately.
- idleConnections = new LinkedList<Connection>();
- while (!queue.isEmpty() && !queue.getFirst().isWaitingFuture())
- {
- final QueueElement holder = queue.removeFirst();
- idleConnections.add(holder.getWaitingConnection());
- }
- }
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.fine(String.format(
- "Connection pool is closing: currentPoolSize=%d, poolSize=%d",
- poolSize - currentPoolSize.availablePermits(), poolSize));
- }
-
- // Close the idle connections.
- for (final Connection connection : idleConnections)
- {
- closeConnection(connection);
- }
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Connection getConnection() throws ErrorResultException,
- InterruptedException
- {
- return getConnectionAsync(null).get();
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Connection> getConnectionAsync(
- final ResultHandler<? super Connection> handler)
- {
- QueueElement holder;
- synchronized (queue)
- {
- if (isClosed)
- {
- throw new IllegalStateException("FixedConnectionPool is already closed");
- }
-
- if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
- {
- holder = new QueueElement(handler);
- queue.add(holder);
- }
- else
- {
- holder = queue.removeFirst();
- }
- }
-
- if (!holder.isWaitingFuture())
- {
- // There was a completed connection attempt.
- final Connection connection = holder.getWaitingConnection();
- final PooledConnection pooledConnection = new PooledConnection(connection);
- if (handler != null)
- {
- handler.handleResult(pooledConnection);
- }
- return new CompletedFutureResult<Connection>(pooledConnection);
- }
- else
- {
- // Grow the pool if needed.
- final FutureResult<Connection> future = holder.getWaitingFuture();
- if (!future.isDone() && currentPoolSize.tryAcquire())
- {
- factory.getConnectionAsync(connectionResultHandler);
- }
- return future;
- }
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString()
- {
- final StringBuilder builder = new StringBuilder();
- builder.append("FixedConnectionPool(");
- builder.append(String.valueOf(factory));
- builder.append(',');
- builder.append(poolSize);
- builder.append(')');
- return builder.toString();
- }
-
-
-
- /**
- * Provide a finalizer because connection pools are expensive resources to
- * accidentally leave around. Also, since they won't be created all that
- * frequently, there's little risk of overloading the finalizer.
- */
- @Override
- protected void finalize() throws Throwable
- {
- close();
- }
-
-
-
- private void closeConnection(final Connection connection)
- {
- // The connection will be closed, so decrease the pool size.
- currentPoolSize.release();
- connection.close();
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.fine(String.format(
- "Closing connection because connection pool is closing: "
- + " currentPoolSize=%d, poolSize=%d",
- poolSize - currentPoolSize.availablePermits(), poolSize));
- }
- }
-
-
-
- private void publishConnection(final Connection connection)
- {
- final QueueElement holder;
- boolean connectionPoolIsClosing = false;
-
- synchronized (queue)
- {
- if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
- {
- if (isClosed)
- {
- connectionPoolIsClosing = true;
- holder = null;
+ synchronized (queue) {
+ if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) {
+ if (isClosed) {
+ connectionPoolIsClosing = true;
+ holder = null;
+ } else {
+ holder = new QueueElement(connection);
+ queue.add(holder);
+ return;
+ }
+ } else {
+ connectionPoolIsClosing = isClosed;
+ holder = queue.removeFirst();
+ }
}
- else
- {
- holder = new QueueElement(connection);
- queue.add(holder);
- return;
+
+ // There was waiting future, so complete it.
+ if (connectionPoolIsClosing) {
+ closeConnection(connection);
+
+ if (holder != null) {
+ final ErrorResultException e =
+ ErrorResultException.newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED,
+ ERR_CONNECTION_POOL_CLOSING.get(toString()).toString());
+ holder.getWaitingFuture().handleErrorResult(e);
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
+ StaticUtils.DEBUG_LOG.fine(String.format("Connection attempt failed: "
+ + e.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
+ }
+ }
+ } else {
+ final PooledConnection pooledConnection = new PooledConnection(connection);
+ holder.getWaitingFuture().handleResult(pooledConnection);
}
- }
- else
- {
- connectionPoolIsClosing = isClosed;
- holder = queue.removeFirst();
- }
}
- // There was waiting future, so complete it.
- if (connectionPoolIsClosing)
- {
- closeConnection(connection);
-
- if (holder != null)
- {
- final ErrorResultException e = ErrorResultException.newErrorResult(
- ResultCode.CLIENT_SIDE_USER_CANCELLED, ERR_CONNECTION_POOL_CLOSING
- .get(toString()).toString());
- holder.getWaitingFuture().handleErrorResult(e);
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.fine(String.format(
- "Connection attempt failed: " + e.getMessage()
- + " currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
- }
- }
- }
- else
- {
- final PooledConnection pooledConnection = new PooledConnection(connection);
- holder.getWaitingFuture().handleResult(pooledConnection);
- }
- }
-
}
--
Gitblit v1.10.0