From 6e60c3e9b573d818a8e2f20b9b8d17bf92245799 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 12 Aug 2011 22:13:21 +0000
Subject: [PATCH] Fix OPENDJ-177: Make connection pools Closeable
---
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties | 2
opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java | 139 +++
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java | 944 ++++++++++++++++++++++++++++
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java | 834 +-----------------------
opendj-sdk/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java | 4
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java | 5
opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java | 18
7 files changed, 1,149 insertions(+), 797 deletions(-)
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java b/opendj-sdk/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java
index 2d35f6f..ae2d8f7 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java
@@ -559,10 +559,10 @@
final String remoteAddress = args[i];
final int remotePort = Integer.parseInt(args[i + 1]);
- factories.add(Connections.newConnectionPool(
+ factories.add(Connections.newFixedConnectionPool(
new LDAPConnectionFactory(remoteAddress, remotePort),
Integer.MAX_VALUE));
- bindFactories.add(Connections.newConnectionPool(
+ bindFactories.add(Connections.newFixedConnectionPool(
new LDAPConnectionFactory(remoteAddress, remotePort),
Integer.MAX_VALUE));
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
index 9984939c..ac686e4 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
@@ -22,806 +22,86 @@
* CDDL HEADER END
*
*
- * Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2011 ForgeRock AS
+ * Copyright 2011 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-
-import org.forgerock.opendj.ldap.requests.*;
-import org.forgerock.opendj.ldap.responses.*;
-
-import com.forgerock.opendj.util.*;
+import java.io.Closeable;
/**
- * A simple connection pool implementation.
+ * A connection factory which maintains and re-uses a pool of connections.
+ * Connections obtained from a connection pool are returned to the connection
+ * pool when closed, although connection pool implementations may choose to
+ * physically close the connection if needed (e.g. in order to reduce the size
+ * of the pool).
+ * <p>
+ * When connection pools are no longer needed they must be explicitly closed in
+ * order to close any remaining pooled connections.
+ * <p>
+ * Since pooled connections are re-used, applications must use operations such
+ * as binds and StartTLS with extreme caution.
*/
-final class ConnectionPool extends AbstractConnectionFactory
+public interface ConnectionPool extends ConnectionFactory, Closeable
{
-
/**
- * This result handler is invoked when an attempt to add a new connection to
- * the pool completes.
+ * Releases any resources associated with this connection pool. Pooled
+ * connections will be permanently closed and this connection pool will no
+ * longer be available for use.
+ * <p>
+ * Attempts to use this connection pool after it has been closed will result
+ * in an {@code IllegalStateException}.
+ * <p>
+ * Calling {@code close} on a connection pool which is already closed has no
+ * effect.
*/
- private final class ConnectionResultHandler implements
- ResultHandler<AsynchronousConnection>
- {
- /**
- * {@inheritDoc}
- */
- @Override
- public void handleErrorResult(final ErrorResultException error)
- {
- // Connection attempt failed, so decrease the pool size.
- currentPoolSize.release();
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.fine(String.format(
- "Connection attempt failed: " + error.getMessage()
- + " currentPoolSize=%d, poolSize=%d",
- poolSize - currentPoolSize.availablePermits(), poolSize));
- }
-
- QueueElement holder;
- synchronized (queue)
- {
- if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
- {
- // No waiting futures.
- return;
- }
- else
- {
- holder = queue.removeFirst();
- }
- }
-
- // There was waiting future, so close it.
- holder.getWaitingFuture().handleErrorResult(error);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void handleResult(final AsynchronousConnection connection)
- {
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.fine(String.format(
- "Connection attempt succeeded: "
- + " currentPoolSize=%d, poolSize=%d",
- poolSize - currentPoolSize.availablePermits(), poolSize));
- }
-
- publishConnection(connection);
- }
- }
+ void close();
/**
- * A pooled connection is passed to the client. It wraps an underlying
- * "pooled" connection obtained from the underlying factory and lasts until
- * the client application closes this connection. More specifically, pooled
- * connections are not actually stored in the internal queue.
- */
- private final class PooledConnection implements AsynchronousConnection
- {
- // Connection event listeners registed against this pooled connection should
- // have the same life time as the pooled connection.
- private final List<ConnectionEventListener> listeners =
- new CopyOnWriteArrayList<ConnectionEventListener>();
-
- private final AsynchronousConnection connection;
-
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-
-
- PooledConnection(final AsynchronousConnection connection)
- {
- this.connection = connection;
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Void> abandon(final AbandonRequest request)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.abandon(request);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Result> add(final AddRequest request,
- final ResultHandler<? super Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.add(request, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Result> add(final AddRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection
- .add(request, resultHandler, intermediateResponseHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void addConnectionEventListener(
- final ConnectionEventListener listener) throws IllegalStateException,
- NullPointerException
- {
- Validator.ensureNotNull(listener);
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- listeners.add(listener);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<BindResult> bind(final BindRequest request,
- final ResultHandler<? super BindResult> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.bind(request, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<BindResult> bind(final BindRequest request,
- final ResultHandler<? super BindResult> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.bind(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close()
- {
- if (!isClosed.compareAndSet(false, true))
- {
- // Already closed.
- return;
- }
-
- // Don't put invalid connections back in the pool.
- if (connection.isValid())
- {
- publishConnection(connection);
- }
- else
- {
- // The connection may have been disconnected by the remote server, but
- // the server may still be available. In order to avoid leaving pending
- // futures hanging indefinitely, we should try to reconnect immediately.
-
- // Close the dead connection.
- connection.close();
-
- // Try to get a new connection to replace it.
- factory.getAsynchronousConnection(connectionResultHandler);
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG.warning(String.format(
- "Connection no longer valid. "
- + "currentPoolSize=%d, poolSize=%d",
- poolSize - currentPoolSize.availablePermits(), poolSize));
- }
- }
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close(final UnbindRequest request, final String reason)
- throws NullPointerException
- {
- close();
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<CompareResult> compare(final CompareRequest request,
- final ResultHandler<? super CompareResult> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.compare(request, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<CompareResult> compare(final CompareRequest request,
- final ResultHandler<? super CompareResult> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.compare(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Result> delete(final DeleteRequest request,
- final ResultHandler<? super Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.delete(request, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Result> delete(final DeleteRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.delete(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <R extends ExtendedResult> FutureResult<R> extendedRequest(
- final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.extendedRequest(request, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public <R extends ExtendedResult> FutureResult<R> extendedRequest(
- final ExtendedRequest<R> request,
- final ResultHandler<? super R> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.extendedRequest(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Connection getSynchronousConnection()
- {
- return new SynchronousConnection(this);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isClosed()
- {
- return isClosed.get();
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isValid()
- {
- return connection.isValid() && !isClosed();
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Result> modify(final ModifyRequest request,
- final ResultHandler<? super Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.modify(request, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Result> modify(final ModifyRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.modify(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Result> modifyDN(final ModifyDNRequest request,
- final ResultHandler<? super Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.modifyDN(request, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Result> modifyDN(final ModifyDNRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.modifyDN(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<SearchResultEntry> readEntry(final DN name,
- final Collection<String> attributeDescriptions,
- final ResultHandler<? super SearchResultEntry> resultHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.readEntry(name, attributeDescriptions, resultHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void removeConnectionEventListener(
- final ConnectionEventListener listener) throws NullPointerException
- {
- Validator.ensureNotNull(listener);
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- listeners.remove(listener);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Result> search(final SearchRequest request,
- final SearchResultHandler handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.search(request, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<Result> search(final SearchRequest request,
- final SearchResultHandler resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.search(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public FutureResult<SearchResultEntry> searchSingleEntry(
- final SearchRequest request,
- final ResultHandler<? super SearchResultEntry> resultHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.searchSingleEntry(request, resultHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString()
- {
- final StringBuilder builder = new StringBuilder();
- builder.append("PooledConnection(");
- builder.append(connection);
- builder.append(')');
- return builder.toString();
- }
- }
-
-
-
- /**
- * A queue element is either a pending connection request future awaiting an
- * {@code AsynchronousConnection} or it is an unused
- * {@code AsynchronousConnection} awaiting a connection request.
- */
- private static final class QueueElement
- {
- private final Object value;
-
-
-
- QueueElement(final AsynchronousConnection connection)
- {
- this.value = connection;
- }
-
-
-
- QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
- {
- this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
- }
-
-
-
- AsynchronousConnection getWaitingConnection()
- {
- if (value instanceof AsynchronousConnection)
- {
- return (AsynchronousConnection) value;
- }
- else
- {
- throw new IllegalStateException();
- }
- }
-
-
-
- @SuppressWarnings("unchecked")
- AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
- {
- if (value instanceof AsynchronousFutureResult)
- {
- return (AsynchronousFutureResult<AsynchronousConnection>) value;
- }
- else
- {
- throw new IllegalStateException();
- }
- }
-
-
-
- boolean isWaitingFuture()
- {
- return value instanceof AsynchronousFutureResult;
- }
-
-
-
- public String toString()
- {
- return String.valueOf(value);
- }
- }
-
-
-
- // Guarded by queue.
- private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
-
- private final ConnectionFactory factory;
-
- private final int poolSize;
-
- private final Semaphore currentPoolSize;
-
- private final ResultHandler<AsynchronousConnection> connectionResultHandler =
- new ConnectionResultHandler();
-
-
-
- /**
- * Creates a new connection pool which will maintain {@code poolSize}
- * connections created using the provided connection factory.
+ * Obtains an asynchronous connection from this connection pool, potentially
+ * opening a new connection if needed.
+ * <p>
+ * The returned {@code FutureResult} can be used to retrieve the pooled
+ * asynchronous connection. Alternatively, if a {@code ResultHandler} is
+ * provided, the handler will be notified when the pooled connection is
+ * available and ready for use.
+ * <p>
+ * Closing the pooled connection will, depending on the connection pool
+ * implementation, return the connection to this pool without closing it.
*
- * @param factory
- * The connection factory to use for creating new connections.
- * @param poolSize
- * The maximum size of the connection pool.
+ * @param handler
+ * The completion handler, or {@code null} if no handler is to be
+ * used.
+ * @return A future which can be used to retrieve the pooled asynchronous
+ * connection.
+ * @throws IllegalStateException
+ * If this connection pool has already been closed.
*/
- ConnectionPool(final ConnectionFactory factory, final int poolSize)
- {
- this.factory = factory;
- this.poolSize = poolSize;
- this.currentPoolSize = new Semaphore(poolSize);
- }
+ FutureResult<AsynchronousConnection> getAsynchronousConnection(
+ ResultHandler<? super AsynchronousConnection> handler);
/**
- * {@inheritDoc}
+ * Obtains a connection from this connection pool, potentially opening a new
+ * connection if needed.
+ * <p>
+ * Closing the pooled connection will, depending on the connection pool
+ * implementation, return the connection to this pool without closing it.
+ *
+ * @return A pooled connection.
+ * @throws ErrorResultException
+ * If the connection request failed for some reason.
+ * @throws InterruptedException
+ * If the current thread was interrupted while waiting.
+ * @throws IllegalStateException
+ * If this connection pool has already been closed.
*/
- @Override
- public FutureResult<AsynchronousConnection> getAsynchronousConnection(
- final ResultHandler<? super AsynchronousConnection> handler)
- {
- QueueElement holder;
- synchronized (queue)
- {
- if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
- {
- holder = new QueueElement(handler);
- queue.add(holder);
- }
- else
- {
- holder = queue.removeFirst();
- }
- }
-
- if (!holder.isWaitingFuture())
- {
- // There was a completed connection attempt.
- final AsynchronousConnection connection = holder.getWaitingConnection();
- final PooledConnection pooledConnection = new PooledConnection(connection);
- if (handler != null)
- {
- handler.handleResult(pooledConnection);
- }
- return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
- }
- else
- {
- // Grow the pool if needed.
- final FutureResult<AsynchronousConnection> future = holder
- .getWaitingFuture();
- if (!future.isDone() && currentPoolSize.tryAcquire())
- {
- factory.getAsynchronousConnection(connectionResultHandler);
- }
- return future;
- }
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString()
- {
- final StringBuilder builder = new StringBuilder();
- builder.append("ConnectionPool(");
- builder.append(String.valueOf(factory));
- builder.append(',');
- builder.append(poolSize);
- builder.append(')');
- return builder.toString();
- }
-
-
-
- private void publishConnection(final AsynchronousConnection connection)
- {
- QueueElement holder;
- synchronized (queue)
- {
- if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
- {
- holder = new QueueElement(connection);
- queue.add(holder);
- return;
- }
- else
- {
- holder = queue.removeFirst();
- }
- }
-
- // There was waiting future, so close it.
- final PooledConnection pooledConnection = new PooledConnection(connection);
- holder.getWaitingFuture().handleResult(pooledConnection);
- }
+ Connection getConnection() throws ErrorResultException, InterruptedException;
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
index 7b1c24c..0429693 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
+ * Portions copyright 2011 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
@@ -92,13 +93,13 @@
* @throws NullPointerException
* If {@code factory} was {@code null}.
*/
- public static ConnectionFactory newConnectionPool(
+ public static ConnectionPool newFixedConnectionPool(
final ConnectionFactory factory, final int poolSize)
throws IllegalArgumentException, NullPointerException
{
Validator.ensureNotNull(factory);
Validator.ensureTrue(poolSize >= 0, "negative pool size");
- return new ConnectionPool(factory, poolSize);
+ return new FixedConnectionPool(factory, poolSize);
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
new file mode 100644
index 0000000..f20244f
--- /dev/null
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -0,0 +1,944 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opendj3/legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opendj3/legal-notices/CDDLv1_0.txt. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2009-2010 Sun Microsystems, Inc.
+ * Portions copyright 2011 ForgeRock AS
+ */
+
+package org.forgerock.opendj.ldap;
+
+
+
+import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+
+import org.forgerock.opendj.ldap.requests.*;
+import org.forgerock.opendj.ldap.responses.*;
+
+import com.forgerock.opendj.util.*;
+
+
+
+/**
+ * A simple connection pool implementation which maintains a fixed number of
+ * connections.
+ */
+final class FixedConnectionPool extends AbstractConnectionFactory implements
+ ConnectionPool
+{
+
+ /**
+ * This result handler is invoked when an attempt to add a new connection to
+ * the pool completes.
+ */
+ private final class ConnectionResultHandler implements
+ ResultHandler<AsynchronousConnection>
+ {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleErrorResult(final ErrorResultException error)
+ {
+ // Connection attempt failed, so decrease the pool size.
+ currentPoolSize.release();
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String.format("Connection attempt failed: "
+ + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
+ }
+
+ QueueElement holder;
+ synchronized (queue)
+ {
+ if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
+ {
+ // No waiting futures.
+ return;
+ }
+ else
+ {
+ holder = queue.removeFirst();
+ }
+ }
+
+ // There was waiting future, so close it.
+ holder.getWaitingFuture().handleErrorResult(error);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String.format(
+ "Connection attempt succeeded: "
+ + " currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
+ }
+
+ publishConnection(connection);
+ }
+ }
+
+
+
+ /**
+ * A pooled connection is passed to the client. It wraps an underlying
+ * "pooled" connection obtained from the underlying factory and lasts until
+ * the client application closes this connection. More specifically, pooled
+ * connections are not actually stored in the internal queue.
+ */
+ private final class PooledConnection implements AsynchronousConnection
+ {
+ // Connection event listeners registed against this pooled connection should
+ // have the same life time as the pooled connection.
+ private final List<ConnectionEventListener> listeners =
+ new CopyOnWriteArrayList<ConnectionEventListener>();
+
+ private final AsynchronousConnection connection;
+
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+
+
+ PooledConnection(final AsynchronousConnection connection)
+ {
+ this.connection = connection;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Void> abandon(final AbandonRequest request)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.abandon(request);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Result> add(final AddRequest request,
+ final ResultHandler<? super Result> handler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.add(request, handler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Result> add(final AddRequest request,
+ final ResultHandler<? super Result> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection
+ .add(request, resultHandler, intermediateResponseHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addConnectionEventListener(
+ final ConnectionEventListener listener) throws IllegalStateException,
+ NullPointerException
+ {
+ Validator.ensureNotNull(listener);
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ listeners.add(listener);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<BindResult> bind(final BindRequest request,
+ final ResultHandler<? super BindResult> handler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.bind(request, handler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<BindResult> bind(final BindRequest request,
+ final ResultHandler<? super BindResult> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.bind(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close()
+ {
+ if (!isClosed.compareAndSet(false, true))
+ {
+ // Already closed.
+ return;
+ }
+
+ // Don't put invalid connections back in the pool.
+ if (connection.isValid())
+ {
+ publishConnection(connection);
+ }
+ else
+ {
+ // The connection may have been disconnected by the remote server, but
+ // the server may still be available. In order to avoid leaving pending
+ // futures hanging indefinitely, we should try to reconnect immediately.
+
+ // Close the dead connection.
+ connection.close();
+
+ // Try to get a new connection to replace it.
+ factory.getAsynchronousConnection(connectionResultHandler);
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+ {
+ StaticUtils.DEBUG_LOG.warning(String.format(
+ "Connection no longer valid. "
+ + "currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
+ }
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close(final UnbindRequest request, final String reason)
+ throws NullPointerException
+ {
+ close();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<CompareResult> compare(final CompareRequest request,
+ final ResultHandler<? super CompareResult> handler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.compare(request, handler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<CompareResult> compare(final CompareRequest request,
+ final ResultHandler<? super CompareResult> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.compare(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Result> delete(final DeleteRequest request,
+ final ResultHandler<? super Result> handler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.delete(request, handler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Result> delete(final DeleteRequest request,
+ final ResultHandler<? super Result> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.delete(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public <R extends ExtendedResult> FutureResult<R> extendedRequest(
+ final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.extendedRequest(request, handler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public <R extends ExtendedResult> FutureResult<R> extendedRequest(
+ final ExtendedRequest<R> request,
+ final ResultHandler<? super R> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.extendedRequest(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Connection getSynchronousConnection()
+ {
+ return new SynchronousConnection(this);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isClosed()
+ {
+ return isClosed.get();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isValid()
+ {
+ return connection.isValid() && !isClosed();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Result> modify(final ModifyRequest request,
+ final ResultHandler<? super Result> handler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.modify(request, handler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Result> modify(final ModifyRequest request,
+ final ResultHandler<? super Result> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.modify(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Result> modifyDN(final ModifyDNRequest request,
+ final ResultHandler<? super Result> handler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.modifyDN(request, handler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Result> modifyDN(final ModifyDNRequest request,
+ final ResultHandler<? super Result> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.modifyDN(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<SearchResultEntry> readEntry(final DN name,
+ final Collection<String> attributeDescriptions,
+ final ResultHandler<? super SearchResultEntry> resultHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.readEntry(name, attributeDescriptions, resultHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void removeConnectionEventListener(
+ final ConnectionEventListener listener) throws NullPointerException
+ {
+ Validator.ensureNotNull(listener);
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ listeners.remove(listener);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Result> search(final SearchRequest request,
+ final SearchResultHandler handler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.search(request, handler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Result> search(final SearchRequest request,
+ final SearchResultHandler resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.search(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<SearchResultEntry> searchSingleEntry(
+ final SearchRequest request,
+ final ResultHandler<? super SearchResultEntry> resultHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.searchSingleEntry(request, resultHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString()
+ {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("PooledConnection(");
+ builder.append(connection);
+ builder.append(')');
+ return builder.toString();
+ }
+ }
+
+
+
+ /**
+ * A queue element is either a pending connection request future awaiting an
+ * {@code AsynchronousConnection} or it is an unused
+ * {@code AsynchronousConnection} awaiting a connection request.
+ */
+ private static final class QueueElement
+ {
+ private final Object value;
+
+
+
+ QueueElement(final AsynchronousConnection connection)
+ {
+ this.value = connection;
+ }
+
+
+
+ QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
+ {
+ this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
+ }
+
+
+
+ @Override
+ public String toString()
+ {
+ return String.valueOf(value);
+ }
+
+
+
+ AsynchronousConnection getWaitingConnection()
+ {
+ if (value instanceof AsynchronousConnection)
+ {
+ return (AsynchronousConnection) value;
+ }
+ else
+ {
+ throw new IllegalStateException();
+ }
+ }
+
+
+
+ @SuppressWarnings("unchecked")
+ AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
+ {
+ if (value instanceof AsynchronousFutureResult)
+ {
+ return (AsynchronousFutureResult<AsynchronousConnection>) value;
+ }
+ else
+ {
+ throw new IllegalStateException();
+ }
+ }
+
+
+
+ boolean isWaitingFuture()
+ {
+ return value instanceof AsynchronousFutureResult;
+ }
+ }
+
+
+
+ // Guarded by queue.
+ private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
+
+ // Guarded by queue.
+ private boolean isClosed = false;
+
+ private final ConnectionFactory factory;
+
+ private final int poolSize;
+
+ private final Semaphore currentPoolSize;
+
+ private final ResultHandler<AsynchronousConnection> connectionResultHandler =
+ new ConnectionResultHandler();
+
+
+
+ /**
+ * Creates a new connection pool which will maintain {@code poolSize}
+ * connections created using the provided connection factory.
+ *
+ * @param factory
+ * The connection factory to use for creating new connections.
+ * @param poolSize
+ * The maximum size of the connection pool.
+ */
+ FixedConnectionPool(final ConnectionFactory factory, final int poolSize)
+ {
+ this.factory = factory;
+ this.poolSize = poolSize;
+ this.currentPoolSize = new Semaphore(poolSize);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close()
+ {
+ final LinkedList<AsynchronousConnection> idleConnections;
+ synchronized (queue)
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ isClosed = true;
+
+ // Remove any connections which are waiting in the queue as these can be
+ // closed immediately.
+ idleConnections = new LinkedList<AsynchronousConnection>();
+ while (!queue.isEmpty() && !queue.getFirst().isWaitingFuture())
+ {
+ final QueueElement holder = queue.removeFirst();
+ idleConnections.add(holder.getWaitingConnection());
+ }
+ }
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String.format(
+ "Connection pool is closing: currentPoolSize=%d, poolSize=%d",
+ poolSize - currentPoolSize.availablePermits(), poolSize));
+ }
+
+ // Close the idle connections.
+ for (final AsynchronousConnection connection : idleConnections)
+ {
+ closeConnection(connection);
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<AsynchronousConnection> getAsynchronousConnection(
+ final ResultHandler<? super AsynchronousConnection> handler)
+ {
+ QueueElement holder;
+ synchronized (queue)
+ {
+ if (isClosed)
+ {
+ throw new IllegalStateException("FixedConnectionPool is already closed");
+ }
+
+ if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
+ {
+ holder = new QueueElement(handler);
+ queue.add(holder);
+ }
+ else
+ {
+ holder = queue.removeFirst();
+ }
+ }
+
+ if (!holder.isWaitingFuture())
+ {
+ // There was a completed connection attempt.
+ final AsynchronousConnection connection = holder.getWaitingConnection();
+ final PooledConnection pooledConnection = new PooledConnection(connection);
+ if (handler != null)
+ {
+ handler.handleResult(pooledConnection);
+ }
+ return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+ }
+ else
+ {
+ // Grow the pool if needed.
+ final FutureResult<AsynchronousConnection> future = holder
+ .getWaitingFuture();
+ if (!future.isDone() && currentPoolSize.tryAcquire())
+ {
+ factory.getAsynchronousConnection(connectionResultHandler);
+ }
+ return future;
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString()
+ {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("FixedConnectionPool(");
+ builder.append(String.valueOf(factory));
+ builder.append(',');
+ builder.append(poolSize);
+ builder.append(')');
+ return builder.toString();
+ }
+
+
+
+ /**
+ * Provide a finalizer because connection pools are expensive resources to
+ * accidentally leave around. Also, since they won't be created all that
+ * frequently, there's little risk of overloading the finalizer.
+ */
+ @Override
+ protected void finalize() throws Throwable
+ {
+ close();
+ }
+
+
+
+ private void closeConnection(final AsynchronousConnection connection)
+ {
+ // The connection will be closed, so decrease the pool size.
+ currentPoolSize.release();
+ connection.close();
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String.format(
+ "Closing connection because connection pool is closing: "
+ + " currentPoolSize=%d, poolSize=%d",
+ poolSize - currentPoolSize.availablePermits(), poolSize));
+ }
+ }
+
+
+
+ private void publishConnection(final AsynchronousConnection connection)
+ {
+ final QueueElement holder;
+ boolean connectionPoolIsClosing = false;
+
+ synchronized (queue)
+ {
+ if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
+ {
+ if (isClosed)
+ {
+ connectionPoolIsClosing = true;
+ holder = null;
+ }
+ else
+ {
+ holder = new QueueElement(connection);
+ queue.add(holder);
+ return;
+ }
+ }
+ else
+ {
+ connectionPoolIsClosing = isClosed;
+ holder = queue.removeFirst();
+ }
+ }
+
+ // There was waiting future, so complete it.
+ if (connectionPoolIsClosing)
+ {
+ closeConnection(connection);
+
+ if (holder != null)
+ {
+ final ErrorResultException e = ErrorResultException.newErrorResult(
+ ResultCode.CLIENT_SIDE_USER_CANCELLED, ERR_CONNECTION_POOL_CLOSING
+ .get(toString()).toString());
+ holder.getWaitingFuture().handleErrorResult(e);
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String.format(
+ "Connection attempt failed: " + e.getMessage()
+ + " currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
+ }
+ }
+ }
+ else
+ {
+ final PooledConnection pooledConnection = new PooledConnection(connection);
+ holder.getWaitingFuture().handleResult(pooledConnection);
+ }
+ }
+}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
index 0c3d7fd..7eb1587 100755
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
@@ -1385,3 +1385,5 @@
WARN_ATTR_SYNTAX_DSR_INVALID_SUPERIOR_RULE=The definition for \
the DIT structure rule "%s" declared a superior rule "%s" which has been \
removed from the schema because it is invalid
+ERR_CONNECTION_POOL_CLOSING=No connection could be obtained from connection \
+ pool "%s" because it is closing
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
index 0ba30b8..3ab0141 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -33,11 +33,14 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import javax.net.ssl.SSLContext;
@@ -48,11 +51,14 @@
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.schema.Schema;
import org.forgerock.opendj.ldap.schema.SchemaBuilder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import com.forgerock.opendj.util.CompletedFutureResult;
import com.forgerock.opendj.util.StaticUtils;
@@ -205,7 +211,7 @@
"online");
// Connection pools.
- factories[7][0] = Connections.newConnectionPool(onlineServer, 10);
+ factories[7][0] = Connections.newFixedConnectionPool(onlineServer, 10);
// Round robin.
factories[8][0] = Connections
@@ -220,9 +226,10 @@
.newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
offlineServer1, offlineServer2, onlineServer)));
factories[13][0] = Connections
- .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
- Connections.newConnectionPool(offlineServer1, 10),
- Connections.newConnectionPool(onlineServer, 10))));
+ .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays
+ .<ConnectionFactory> asList(
+ Connections.newFixedConnectionPool(offlineServer1, 10),
+ Connections.newFixedConnectionPool(onlineServer, 10))));
// Fail-over.
factories[14][0] = Connections
@@ -237,11 +244,12 @@
.newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays.asList(
offlineServer1, offlineServer2, onlineServer)));
factories[19][0] = Connections
- .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays.asList(
- Connections.newConnectionPool(offlineServer1, 10),
- Connections.newConnectionPool(onlineServer, 10))));
+ .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays
+ .<ConnectionFactory> asList(
+ Connections.newFixedConnectionPool(offlineServer1, 10),
+ Connections.newFixedConnectionPool(onlineServer, 10))));
- factories[20][0] = Connections.newConnectionPool(onlineServer, 10);
+ factories[20][0] = Connections.newFixedConnectionPool(onlineServer, 10);
return factories;
}
@@ -367,4 +375,119 @@
connection.close();
}
}
+
+
+
+ /**
+ * Tests connection pool closure.
+ *
+ * @throws Exception If an unexpected exception occurred.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testConnectionPoolClose() throws Exception
+ {
+ // We'll use a pool of 4 connections.
+ final int SIZE = 4;
+
+ // Count number of real connections which are open.
+ final AtomicInteger realConnectionCount = new AtomicInteger();
+ final boolean[] realConnectionIsClosed = new boolean[SIZE];
+ Arrays.fill(realConnectionIsClosed, true);
+
+ // Mock underlying connection factory which always succeeds.
+ final ConnectionFactory mockFactory = mock(ConnectionFactory.class);
+ when(mockFactory.getAsynchronousConnection(any(ResultHandler.class)))
+ .thenAnswer(new Answer<FutureResult<AsynchronousConnection>>()
+ {
+
+ public FutureResult<AsynchronousConnection> answer(
+ InvocationOnMock invocation) throws Throwable
+ {
+ // Update state.
+ final int connectionID = realConnectionCount.getAndIncrement();
+ realConnectionIsClosed[connectionID] = false;
+
+ // Mock connection decrements counter on close.
+ AsynchronousConnection mockConnection = mock(AsynchronousConnection.class);
+ doAnswer(new Answer<Void>()
+ {
+ public Void answer(InvocationOnMock invocation) throws Throwable
+ {
+ realConnectionCount.decrementAndGet();
+ realConnectionIsClosed[connectionID] = true;
+ return null;
+ }
+ }).when(mockConnection).close();
+ when(mockConnection.isValid()).thenReturn(true);
+ when(mockConnection.toString()).thenReturn("Mock connection " + connectionID);
+
+ // Excecute handler and return future.
+ ResultHandler<? super AsynchronousConnection> handler =
+ (ResultHandler<? super AsynchronousConnection>) invocation.getArguments()[0];
+ if (handler != null)
+ {
+ handler.handleResult(mockConnection);
+ }
+ return new CompletedFutureResult<AsynchronousConnection>(
+ mockConnection);
+ }
+ });
+
+ ConnectionPool pool = Connections.newFixedConnectionPool(mockFactory, SIZE);
+ Connection[] pooledConnections = new Connection[SIZE];
+ for (int i = 0; i < SIZE; i++)
+ {
+ pooledConnections[i] = pool.getConnection();
+ }
+
+ // Pool is fully utilized.
+ assertThat(realConnectionCount.get()).isEqualTo(SIZE);
+ assertThat(pooledConnections[0].isClosed()).isFalse();
+ assertThat(pooledConnections[1].isClosed()).isFalse();
+ assertThat(pooledConnections[2].isClosed()).isFalse();
+ assertThat(pooledConnections[3].isClosed()).isFalse();
+ assertThat(realConnectionIsClosed[0]).isFalse();
+ assertThat(realConnectionIsClosed[1]).isFalse();
+ assertThat(realConnectionIsClosed[2]).isFalse();
+ assertThat(realConnectionIsClosed[3]).isFalse();
+
+ // Release two connections.
+ pooledConnections[0].close();
+ pooledConnections[1].close();
+ assertThat(realConnectionCount.get()).isEqualTo(4);
+ assertThat(pooledConnections[0].isClosed()).isTrue();
+ assertThat(pooledConnections[1].isClosed()).isTrue();
+ assertThat(pooledConnections[2].isClosed()).isFalse();
+ assertThat(pooledConnections[3].isClosed()).isFalse();
+ assertThat(realConnectionIsClosed[0]).isFalse();
+ assertThat(realConnectionIsClosed[1]).isFalse();
+ assertThat(realConnectionIsClosed[2]).isFalse();
+ assertThat(realConnectionIsClosed[3]).isFalse();
+
+ // Close the pool closing the two connections immediately.
+ pool.close();
+ assertThat(realConnectionCount.get()).isEqualTo(2);
+ assertThat(pooledConnections[0].isClosed()).isTrue();
+ assertThat(pooledConnections[1].isClosed()).isTrue();
+ assertThat(pooledConnections[2].isClosed()).isFalse();
+ assertThat(pooledConnections[3].isClosed()).isFalse();
+ assertThat(realConnectionIsClosed[0]).isTrue();
+ assertThat(realConnectionIsClosed[1]).isTrue();
+ assertThat(realConnectionIsClosed[2]).isFalse();
+ assertThat(realConnectionIsClosed[3]).isFalse();
+
+ // Release two remaining connections and check that they get closed.
+ pooledConnections[2].close();
+ pooledConnections[3].close();
+ assertThat(realConnectionCount.get()).isEqualTo(0);
+ assertThat(pooledConnections[0].isClosed()).isTrue();
+ assertThat(pooledConnections[1].isClosed()).isTrue();
+ assertThat(pooledConnections[2].isClosed()).isTrue();
+ assertThat(pooledConnections[3].isClosed()).isTrue();
+ assertThat(realConnectionIsClosed[0]).isTrue();
+ assertThat(realConnectionIsClosed[1]).isTrue();
+ assertThat(realConnectionIsClosed[2]).isTrue();
+ assertThat(realConnectionIsClosed[3]).isTrue();
+ }
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
index a98ce81..fe89608 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
@@ -604,10 +604,11 @@
// Round robin.
final ConnectionFactory loadBalancer = Connections
- .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
- Connections.newConnectionPool(offlineServer1, 10),
- Connections.newConnectionPool(offlineServer2, 10),
- Connections.newConnectionPool(onlineServer, 10))));
+ .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays
+ .<ConnectionFactory> asList(
+ Connections.newFixedConnectionPool(offlineServer1, 10),
+ Connections.newFixedConnectionPool(offlineServer2, 10),
+ Connections.newFixedConnectionPool(onlineServer, 10))));
final MockServerConnection proxyServerConnection = new MockServerConnection();
final MockServerConnectionFactory proxyServerConnectionFactory = new MockServerConnectionFactory(
@@ -699,10 +700,11 @@
// Round robin.
final ConnectionFactory loadBalancer = Connections
- .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
- Connections.newConnectionPool(offlineServer1, 10),
- Connections.newConnectionPool(offlineServer2, 10),
- Connections.newConnectionPool(onlineServer, 10))));
+ .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays
+ .<ConnectionFactory> asList(
+ Connections.newFixedConnectionPool(offlineServer1, 10),
+ Connections.newFixedConnectionPool(offlineServer2, 10),
+ Connections.newFixedConnectionPool(onlineServer, 10))));
final MockServerConnection proxyServerConnection = new MockServerConnection()
{
--
Gitblit v1.10.0