sdk/src/org/opends/sdk/AbstractConnection.java
@@ -254,10 +254,10 @@ { // Got more entries than expected. Result result = Responses.newResult( ResultCode.CLIENT_SIDE_LOCAL_ERROR).setDiagnosticMessage( ResultCode.CLIENT_SIDE_MORE_RESULTS_TO_RETURN).setDiagnosticMessage( ERR_UNEXPECTED_SEARCH_RESULT_ENTRIES.get(handler.entryCount) .toString()); throw new ErrorResultException(result); throw ErrorResultException.wrap(result); } else if (handler.firstReference != null) { @@ -267,7 +267,7 @@ ERR_UNEXPECTED_SEARCH_RESULT_REFERENCES.get( handler.firstReference.getURIs().iterator().next()) .toString()); throw new ErrorResultException(result); throw ErrorResultException.wrap(result); } else { sdk/src/org/opends/sdk/AbstractConnectionFactory.java
@@ -116,10 +116,10 @@ future.cancel(false); Result result = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR) Responses.newResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR) .setCause(e) .setDiagnosticMessage(e.getLocalizedMessage()); throw new ErrorResultException(result); throw ErrorResultException.wrap(result); } } } sdk/src/org/opends/sdk/AsynchronousConnection.java
@@ -32,8 +32,9 @@ import java.io.Closeable; import org.opends.sdk.requests.*; import org.opends.sdk.responses.*; import org.opends.sdk.responses.BindResult; import org.opends.sdk.responses.CompareResult; import org.opends.sdk.responses.Result; /** @@ -260,12 +261,12 @@ * @param request * The unbind request to use in the case where a physical * connection is closed. * @param reason * A reason describing why the connection was closed. * @throws NullPointerException * If {@code request} was {@code null}. */ void close(UnbindRequest request) throws NullPointerException; void close(UnbindRequest request, String reason); /** * Compares an entry in the Directory Server using the provided @@ -492,4 +493,14 @@ */ void removeConnectionEventListener(ConnectionEventListener listener) throws NullPointerException; /** * Returns <code>true</code> if the connection is closed for * <code>false</code> otherwise. * * @return <code>true</code> if the connection is closed for * <code>false</code> otherwise. */ boolean isClosed(); } sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
@@ -105,6 +105,7 @@ ConnectionResultHandler<? super AuthenticatedAsynchronousConnection, P> handler, P p) { // TODO: bug here? if allowRebind= false then bind will never happen ConnectionFutureImpl<P> future = new ConnectionFutureImpl<P>( allowRebinds ? request : null, handler, p); future.connectFuture = parentFactory.getAsynchronousConnection( @@ -315,10 +316,10 @@ public void close(UnbindRequest request) public void close(UnbindRequest request, String reason) throws NullPointerException { connection.close(request); connection.close(request, reason); } @@ -463,6 +464,13 @@ searchResulthandler, p); } /** * {@inheritDoc} */ public boolean isClosed() { return connection.isClosed(); } } sdk/src/org/opends/sdk/CancelledException.java
New file @@ -0,0 +1,41 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.sdk; import org.opends.sdk.responses.Result; /** * Created by IntelliJ IDEA. User: boli Date: Dec 2, 2009 Time: 12:25:31 PM To * change this template use File | Settings | File Templates. */ public class CancelledException extends ErrorResultException { public CancelledException(Result result) { super(result); } } sdk/src/org/opends/sdk/Connection.java
@@ -303,10 +303,12 @@ * @param request * The unbind request to use in the case where a physical * connection is closed. * @param reason * A reason describing why the connection was closed. * @throws NullPointerException * If {@code request} was {@code null}. */ void close(UnbindRequest request) throws NullPointerException; void close(UnbindRequest request, String reason) throws NullPointerException; sdk/src/org/opends/sdk/ConnectionException.java
New file @@ -0,0 +1,40 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.sdk; import org.opends.sdk.responses.Result; /** * Created by IntelliJ IDEA. User: boli Date: Dec 1, 2009 Time: 2:31:58 PM To * change this template use File | Settings | File Templates. */ public class ConnectionException extends ErrorResultException { public ConnectionException(Result result) { super(result); } } sdk/src/org/opends/sdk/ErrorResultException.java
@@ -70,6 +70,24 @@ // TODO: choose type of exception based on result code (e.g. // referral). if(result.getResultCode() == ResultCode.CLIENT_SIDE_SERVER_DOWN || result.getResultCode() == ResultCode.CLIENT_SIDE_CONNECT_ERROR || result.getResultCode() == ResultCode.CLIENT_SIDE_DECODING_ERROR || result.getResultCode() == ResultCode.CLIENT_SIDE_ENCODING_ERROR) { return new ConnectionException(result); } if(result.getResultCode() == ResultCode.CLIENT_SIDE_TIMEOUT) { return new OperationTimeoutException(result); } if(result.getResultCode() == ResultCode.CLIENT_SIDE_USER_CANCELLED) { return new CancelledException(result); } return new ErrorResultException(result); } sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
New file @@ -0,0 +1,360 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.sdk; import org.opends.sdk.responses.*; import org.opends.sdk.requests.*; import org.opends.sdk.util.Validator; import java.util.List; import java.util.LinkedList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * An heart beat connection factory can be used to create * connections that sends a periodic search request to a Directory Server. */ public class HeartBeatConnectionFactory extends AbstractConnectionFactory< HeartBeatConnectionFactory.HeartBeatAsynchronousConnection> { private final SearchRequest heartBeat; private final int interval; private final List<HeartBeatAsynchronousConnection> activeConnections; private final ConnectionFactory<?> parentFactory; private boolean stopRequested; public HeartBeatConnectionFactory( ConnectionFactory<?> parentFactory, int interval) { this(parentFactory, Requests.newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1"), interval); } public HeartBeatConnectionFactory( ConnectionFactory<?> parentFactory, SearchRequest heartBeat, int interval) { Validator.ensureNotNull(parentFactory, heartBeat); this.heartBeat = heartBeat; this.interval = interval; this.activeConnections = new LinkedList<HeartBeatAsynchronousConnection>(); this.parentFactory = parentFactory; Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { stopRequested = true; } }); new HeartBeatThread().start(); } /** * An asynchronous connection that sends heart beats and supports all * operations. */ public final class HeartBeatAsynchronousConnection implements AsynchronousConnection, ConnectionEventListener, ResultHandler<Result, Void> { private final AsynchronousConnection connection; public HeartBeatAsynchronousConnection(AsynchronousConnection connection) { this.connection = connection; } public void abandon(AbandonRequest request) throws UnsupportedOperationException, IllegalStateException, NullPointerException { connection.abandon(request); } public <P> ResultFuture<Result> add( AddRequest request, ResultHandler<Result, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { return connection.add(request, handler, p); } public <P> ResultFuture<BindResult> bind( BindRequest request, ResultHandler<? super BindResult, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { return connection.bind(request, handler, p); } public void close() { synchronized (activeConnections) { connection.removeConnectionEventListener(this); activeConnections.remove(this); } connection.close(); } public void close(UnbindRequest request, String reason) throws NullPointerException { synchronized (activeConnections) { connection.removeConnectionEventListener(this); activeConnections.remove(this); } connection.close(request, reason); } public <P> ResultFuture<CompareResult> compare( CompareRequest request, ResultHandler<? super CompareResult, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { return connection.compare(request, handler, p); } public <P> ResultFuture<Result> delete( DeleteRequest request, ResultHandler<Result, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { return connection.delete(request, handler, p); } public <R extends Result, P> ResultFuture<R> extendedRequest( ExtendedRequest<R> request, ResultHandler<? super R, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { return connection.extendedRequest(request, handler, p); } public <P> ResultFuture<Result> modify( ModifyRequest request, ResultHandler<Result, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { return connection.modify(request, handler, p); } public <P> ResultFuture<Result> modifyDN( ModifyDNRequest request, ResultHandler<Result, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { return connection.modifyDN(request, handler, p); } public <P> ResultFuture<Result> search( SearchRequest request, ResultHandler<Result, P> resultHandler, SearchResultHandler<P> searchResultHandler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { return connection.search(request, resultHandler, searchResultHandler, p); } public void addConnectionEventListener(ConnectionEventListener listener) throws IllegalStateException, NullPointerException { connection.addConnectionEventListener(listener); } public void removeConnectionEventListener(ConnectionEventListener listener) throws NullPointerException { connection.removeConnectionEventListener(listener); } /** * {@inheritDoc} */ public boolean isClosed() { return connection.isClosed(); } public void connectionReceivedUnsolicitedNotification( GenericExtendedResult notification) { // Do nothing } public void connectionErrorOccurred( boolean isDisconnectNotification, ErrorResultException error) { synchronized (activeConnections) { connection.removeConnectionEventListener(this); activeConnections.remove(this); } } public void handleErrorResult(Void aVoid, ErrorResultException error) { // TODO: I18N if(error instanceof OperationTimeoutException) { close(Requests.newUnbindRequest(), "Heart beat timed out"); } } public void handleResult(Void aVoid, Result result) { // Do nothing } private void sendHeartBeat() { search(heartBeat, this, null, null); } } private final class HeartBeatThread extends Thread { private HeartBeatThread() { super("Heart Beat Thread"); } public void run() { while (!stopRequested) { synchronized (activeConnections) { for (HeartBeatAsynchronousConnection connection : activeConnections) { connection.sendHeartBeat(); } } try { sleep(interval); } catch (InterruptedException e) { // Ignore } } } } private final class ConnectionFutureImpl<P> implements ConnectionFuture<HeartBeatAsynchronousConnection>, ConnectionResultHandler<AsynchronousConnection, Void> { private volatile HeartBeatAsynchronousConnection heartBeatConnection; private volatile ErrorResultException exception; private volatile ConnectionFuture<?> connectFuture; private final CountDownLatch latch = new CountDownLatch(1); private final ConnectionResultHandler<? super HeartBeatAsynchronousConnection, P> handler; private final P p; private boolean cancelled; private ConnectionFutureImpl( ConnectionResultHandler< ? super HeartBeatAsynchronousConnection, P> handler, P p) { this.handler = handler; this.p = p; } public boolean cancel(boolean mayInterruptIfRunning) { cancelled = connectFuture.cancel(mayInterruptIfRunning); if (cancelled) { latch.countDown(); } return cancelled; } public HeartBeatAsynchronousConnection get() throws InterruptedException, ErrorResultException { latch.await(); if (cancelled) { throw new CancellationException(); } if (exception != null) { throw exception; } return heartBeatConnection; } public HeartBeatAsynchronousConnection get( long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException { latch.await(timeout, unit); if (cancelled) { throw new CancellationException(); } if (exception != null) { throw exception; } return heartBeatConnection; } public boolean isCancelled() { return cancelled; } public boolean isDone() { return latch.getCount() == 0; } public void handleConnection( Void v, AsynchronousConnection connection) { heartBeatConnection = new HeartBeatAsynchronousConnection(connection); synchronized (activeConnections) { connection.addConnectionEventListener(heartBeatConnection); activeConnections.add(heartBeatConnection); } if (handler != null) { handler.handleConnection(p, heartBeatConnection); } latch.countDown(); } public void handleConnectionError(Void v, ErrorResultException error) { exception = error; if (handler != null) { handler.handleConnectionError(p, error); } latch.countDown(); } } public <P> ConnectionFuture<? extends HeartBeatAsynchronousConnection> getAsynchronousConnection( ConnectionResultHandler<? super HeartBeatAsynchronousConnection, P> pConnectionResultHandler, P p) { ConnectionFutureImpl<P> future = new ConnectionFutureImpl<P>(pConnectionResultHandler, p); future.connectFuture = parentFactory.getAsynchronousConnection(future, null); return future; } } sdk/src/org/opends/sdk/OperationTimeoutException.java
New file @@ -0,0 +1,41 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.sdk; import org.opends.sdk.responses.Result; /** * Created by IntelliJ IDEA. User: boli Date: Dec 2, 2009 Time: 1:28:13 PM To * change this template use File | Settings | File Templates. */ public class OperationTimeoutException extends ErrorResultException { public OperationTimeoutException(Result result) { super(result); } } sdk/src/org/opends/sdk/SynchronousConnection.java
@@ -122,9 +122,10 @@ public void close(UnbindRequest request) throws NullPointerException public void close(UnbindRequest request, String reason) throws NullPointerException { connection.close(request); connection.close(request, reason); } sdk/src/org/opends/sdk/ldap/AbstractResultFutureImpl.java
@@ -38,6 +38,7 @@ import org.opends.sdk.ResultHandler; import org.opends.sdk.requests.Requests; import org.opends.sdk.responses.Result; import org.opends.sdk.responses.Responses; import org.opends.sdk.util.StaticUtils; @@ -241,11 +242,12 @@ private R get0() throws CancellationException, ErrorResultException private R get0() throws ErrorResultException { if (isCancelled()) { throw new CancellationException(); throw ErrorResultException.wrap( Responses.newResult(ResultCode.CLIENT_SIDE_USER_CANCELLED)); } else if (result.getResultCode().isExceptional()) { sdk/src/org/opends/sdk/ldap/ConnectionPool.java
New file @@ -0,0 +1,438 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.sdk.ldap; import org.opends.sdk.*; import org.opends.sdk.util.StaticUtils; import org.opends.sdk.responses.Result; import org.opends.sdk.responses.BindResult; import org.opends.sdk.responses.CompareResult; import org.opends.sdk.responses.GenericExtendedResult; import org.opends.sdk.requests.*; import java.util.concurrent.*; import java.util.Queue; import java.util.Stack; import java.util.logging.Level; /** * Created by IntelliJ IDEA. User: digitalperk Date: Nov 25, 2009 Time: 11:12:29 * AM To change this template use File | Settings | File Templates. */ public class ConnectionPool extends AbstractConnectionFactory<AsynchronousConnection> { private final ConnectionFactory<?> connectionFactory; private volatile int numConnections; private final int poolSize; private final Stack<AsynchronousConnection> pool; private final ConcurrentLinkedQueue<PendingConnectionFuture> pendingFutures; private final Object lock = new Object(); private class PooledConnectionWapper implements AsynchronousConnection, ConnectionEventListener { private AsynchronousConnection connection; private PooledConnectionWapper(AsynchronousConnection connection) { this.connection = connection; this.connection.addConnectionEventListener(this); } public void abandon(AbandonRequest request) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } connection.abandon(request); } public <P> ResultFuture<Result> add( AddRequest request, ResultHandler<Result, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.add(request, handler, p); } public <P> ResultFuture<BindResult> bind( BindRequest request, ResultHandler<? super BindResult, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.bind(request, handler, p); } public void close() { synchronized (lock) { try { // Don't put closed connections back in the pool. if (connection.isClosed()) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String .format("Dead connection released to pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } return; } // See if there waiters pending PendingConnectionFuture future = pendingFutures.poll(); if (future != null) { PooledConnectionWapper pooledConnection = new PooledConnectionWapper(connection); future.connection(pooledConnection); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String .format("Connection released to pool and directly " + "given to waiter. numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } return; } // No waiters. Put back in pool. pool.push(connection); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String .format("Connection released to pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } finally { // Null out the underlying connection to prevent further use. connection = null; } } } public void close(UnbindRequest request, String reason) throws NullPointerException { close(); } public <P> ResultFuture<CompareResult> compare( CompareRequest request, ResultHandler<? super CompareResult, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.compare(request, handler, p); } public <P> ResultFuture<Result> delete( DeleteRequest request, ResultHandler<Result, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.delete(request, handler, p); } public <R extends Result, P> ResultFuture<R> extendedRequest( ExtendedRequest<R> request, ResultHandler<? super R, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.extendedRequest(request, handler, p); } public <P> ResultFuture<Result> modify( ModifyRequest request, ResultHandler<Result, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.modify(request, handler, p); } public <P> ResultFuture<Result> modifyDN( ModifyDNRequest request, ResultHandler<Result, P> handler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.modifyDN(request, handler, p); } public <P> ResultFuture<Result> search( SearchRequest request, ResultHandler<Result, P> resultHandler, SearchResultHandler<P> searchResulthandler, P p) throws UnsupportedOperationException, IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } return connection.search(request, resultHandler, searchResulthandler, p); } public void addConnectionEventListener(ConnectionEventListener listener) throws IllegalStateException, NullPointerException { if (connection == null) { throw new IllegalStateException(); } } public void removeConnectionEventListener(ConnectionEventListener listener) throws NullPointerException { if (connection == null) { throw new IllegalStateException(); } } /** * {@inheritDoc} */ public boolean isClosed() { return connection == null; } public void connectionReceivedUnsolicitedNotification( GenericExtendedResult notification) { // Ignore } public void connectionErrorOccurred( boolean isDisconnectNotification, ErrorResultException error) { synchronized (lock) { // Remove this connection from the pool if its in there pool.remove(this); numConnections--; connection.removeConnectionEventListener(this); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String .format("Connection error occured: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } } } public class CompletedConnectionFuture implements ConnectionFuture<AsynchronousConnection> { private final PooledConnectionWapper connection; public CompletedConnectionFuture(PooledConnectionWapper connection) { this.connection = connection; } public boolean cancel(boolean mayInterruptIfRunning) { return false; } public AsynchronousConnection get() throws InterruptedException, ErrorResultException { return connection; } public AsynchronousConnection get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException { return connection; } public boolean isCancelled() { return false; } public boolean isDone() { return true; } } public class PendingConnectionFuture<P> implements ConnectionFuture<AsynchronousConnection> { private volatile boolean isCancelled; private volatile PooledConnectionWapper connection; private volatile ErrorResultException err; private final ConnectionResultHandler<? super AsynchronousConnection, P> handler; private final P p; private final CountDownLatch latch = new CountDownLatch(1); public PendingConnectionFuture() { this.handler = null; this.p = null; } public PendingConnectionFuture( P p, ConnectionResultHandler<? super AsynchronousConnection, P> handler) { this.handler = handler; this.p = p; } public synchronized boolean cancel(boolean mayInterruptIfRunning) { return pendingFutures.remove(this) && (isCancelled = true); } public AsynchronousConnection get() throws InterruptedException, ErrorResultException { latch.await(); if (err != null) { throw err; } return connection; } public AsynchronousConnection get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException { latch.await(timeout, unit); if (err != null) { throw err; } return connection; } public synchronized boolean isCancelled() { return isCancelled; } public boolean isDone() { return latch.getCount() == 0; } private void connection(PooledConnectionWapper connection) { this.connection = connection; if (handler != null) { handler.handleConnection(p, connection); } latch.countDown(); } private void error(ErrorResultException e) { this.err = e; if (handler != null) { handler.handleConnectionError(p, e); } latch.countDown(); } } public ConnectionPool(ConnectionFactory<?> connectionFactory, int poolSize) { this.connectionFactory = connectionFactory; this.poolSize = poolSize; this.pool = new Stack<AsynchronousConnection>(); this.pendingFutures = new ConcurrentLinkedQueue<PendingConnectionFuture>(); } private class WrapConnectionResultHandler implements ConnectionResultHandler<AsynchronousConnection, Void> { private final PendingConnectionFuture future; private WrapConnectionResultHandler(PendingConnectionFuture future) { this.future = future; } public void handleConnection( java.lang.Void p, AsynchronousConnection connection) { PooledConnectionWapper pooledConnection = new PooledConnectionWapper(connection); future.connection(pooledConnection); } public void handleConnectionError( java.lang.Void p, ErrorResultException error) { future.error(error); } } public <P> ConnectionFuture<AsynchronousConnection> getAsynchronousConnection( ConnectionResultHandler<? super AsynchronousConnection, P> handler, P p) { synchronized (lock) { // Check to see if we have a connection in the pool if (!pool.isEmpty()) { AsynchronousConnection conn = pool.pop(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String .format("Connection aquired from pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } PooledConnectionWapper pooledConnection = new PooledConnectionWapper(conn); if (handler != null) { handler.handleConnection(p, pooledConnection); } return new CompletedConnectionFuture(pooledConnection); } PendingConnectionFuture<P> pendingFuture = new PendingConnectionFuture<P>(p, handler); // Pool was empty. Maybe a new connection if pool size is not reached if (numConnections < poolSize) { numConnections++; WrapConnectionResultHandler wrapHandler = new WrapConnectionResultHandler(pendingFuture); connectionFactory.getAsynchronousConnection(wrapHandler, null); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String .format("New connection established and aquired. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } else { // Have to wait pendingFutures.add(pendingFuture); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String .format("No connections available. Wait-listed" + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } return pendingFuture; } } } sdk/src/org/opends/sdk/ldap/LDAPConnection.java
@@ -38,6 +38,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLContext; @@ -130,11 +131,15 @@ saslContext.evaluateCredentials(result .getServerSASLCredentials()); } catch (SaslException se) catch (SaslException e) { pendingBindOrStartTLS = -1; Result errorResult = adaptException(se); // FIXME: I18N need to have a better error message. // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_LOCAL_ERROR).setDiagnosticMessage( "An error occurred during SASL authentication").setCause(e); future.handleErrorResult(errorResult); return; } @@ -163,7 +168,10 @@ { pendingRequests.remove(messageID); Result errorResult = adaptException(e); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.handleErrorResult(errorResult); } @@ -251,7 +259,20 @@ */ public void handleException(Throwable throwable) { Result errorResult = adaptException(throwable); Result errorResult; if(throwable instanceof EOFException) { // FIXME: Is this the best result code? errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_SERVER_DOWN).setCause(throwable); } else { // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_DECODING_ERROR).setCause(throwable); } connectionErrorOccurred(errorResult); } @@ -625,7 +646,8 @@ private boolean isClosed = false; private final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>(); private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); private final AtomicInteger nextMsgID = new AtomicInteger(1); @@ -702,7 +724,10 @@ } catch (IOException e) { Result errorResult = adaptException(e); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); } } @@ -754,7 +779,10 @@ { pendingRequests.remove(messageID); Result errorResult = adaptException(e); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.handleErrorResult(errorResult); } @@ -847,7 +875,11 @@ } catch (SaslException e) { Result errorResult = adaptException(e); // FIXME: I18N need to have a better error message. // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_LOCAL_ERROR).setDiagnosticMessage( "An error occurred during SASL authentication").setCause(e); future.handleErrorResult(errorResult); return future; } @@ -860,7 +892,8 @@ else { pendingRequests.remove(messageID); future.handleResult(Responses.newBindResult(ResultCode.PROTOCOL_ERROR) future.handleResult(Responses.newBindResult( ResultCode.CLIENT_SIDE_AUTH_UNKNOWN) .setDiagnosticMessage("Auth type not supported")); } asn1Writer.flush(); @@ -869,7 +902,10 @@ { pendingRequests.remove(messageID); Result errorResult = adaptException(e); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.handleErrorResult(errorResult); } @@ -890,25 +926,25 @@ */ public void close() { close(Requests.newUnbindRequest()); close(Requests.newUnbindRequest(), null); } /** * {@inheritDoc} */ public void close(UnbindRequest request) throws NullPointerException { public void close(UnbindRequest request, String reason) throws NullPointerException { // FIXME: I18N need to internationalize this message. Validator.ensureNotNull(request); close(request, false, Responses.newResult(ResultCode.CLIENT_SIDE_USER_CANCELLED) .setDiagnosticMessage("Connection closed by client")); close(request, false, Responses.newResult(ResultCode.CLIENT_SIDE_USER_CANCELLED) .setDiagnosticMessage("Connection closed by client" + (reason != null ? ": " + reason : ""))); } /** * {@inheritDoc} */ @@ -950,7 +986,10 @@ { pendingRequests.remove(messageID); Result errorResult = adaptException(e); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.handleErrorResult(errorResult); } @@ -1005,7 +1044,10 @@ { pendingRequests.remove(messageID); Result errorResult = adaptException(e); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.handleErrorResult(errorResult); } @@ -1081,7 +1123,10 @@ { pendingRequests.remove(messageID); Result errorResult = adaptException(e); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.handleErrorResult(errorResult); } @@ -1136,7 +1181,10 @@ { pendingRequests.remove(messageID); Result errorResult = adaptException(e); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.handleErrorResult(errorResult); } @@ -1191,7 +1239,10 @@ { pendingRequests.remove(messageID); Result errorResult = adaptException(e); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.handleErrorResult(errorResult); } @@ -1264,7 +1315,10 @@ { pendingRequests.remove(messageID); Result errorResult = adaptException(e); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.handleErrorResult(errorResult); } @@ -1307,57 +1361,6 @@ private Result adaptException(Throwable t) { if (t instanceof ExecutionException) { ExecutionException e = (ExecutionException) t; t = e.getCause(); } Result errorResult; try { throw t; } catch (SaslException e) { // FIXME: I18N need to have a better error message. // FIXME: Is this the best result code? errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setDiagnosticMessage( "An error occurred during SASL authentication").setCause(e); } catch (EOFException e) { // FIXME: I18N need to have a better error message. // FIXME: what sort of IOExceptions can be thrown? // FIXME: Is this the best result code? errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_SERVER_DOWN).setDiagnosticMessage( "Connection unexpectedly terminated by server").setCause(e); } catch (IOException e) { // FIXME: I18N need to have a better error message. // FIXME: what sort of IOExceptions can be thrown? // FIXME: Is this the best result code? errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setDiagnosticMessage( "An error occurred whilst attempting to send a request: " + e.toString()).setCause(e); } catch (Throwable e) { // FIXME: I18N need to have a better error message. // FIXME: Is this the best result code? errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setDiagnosticMessage( "An unknown error occurred: " + e.toString()).setCause(e); } return errorResult; } private void close(UnbindRequest unbindRequest, boolean isDisconnectNotification, Result reason) { @@ -1489,8 +1492,8 @@ { for (ConnectionEventListener listener : listeners) { listener.connectionErrorOccurred(false, ErrorResultException .wrap(reason)); listener.connectionErrorOccurred(isDisconnectNotification, ErrorResultException.wrap(reason)); } } } @@ -1506,16 +1509,16 @@ // TODO uncomment if we decide these methods are useful. // /** // * {@inheritDoc} // */ // public boolean isClosed() // { // synchronized (writeLock) // { // return isClosed; // } // } /** * {@inheritDoc} */ public boolean isClosed() { synchronized (writeLock) { return isClosed; } } // // // @@ -1645,11 +1648,23 @@ sslHandshaker.handshake(reader, writer, sslEngineConfigurator) .get(); } catch (ExecutionException ee) { // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_CONNECT_ERROR).setCause(ee.getCause()); connectionErrorOccurred(errorResult); throw ErrorResultException.wrap(errorResult); } catch (Exception e) { Result result = adaptException(e); connectionErrorOccurred(result); throw ErrorResultException.wrap(result); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_CONNECT_ERROR).setCause(e); connectionErrorOccurred(errorResult); throw ErrorResultException.wrap(errorResult); } }