opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/AbstractLDAPFutureResultImpl.java
File was renamed from opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/AbstractResultFutureImpl.java @@ -41,12 +41,12 @@ /** * Abstract result future implementation. * Abstract future result implementation. * * @param <S> * The type of result returned by this future. */ abstract class AbstractResultFutureImpl<S extends Result> extends abstract class AbstractLDAPFutureResultImpl<S extends Result> extends AbstractFutureResult<S> implements FutureResult<S> { private final LDAPConnection connection; @@ -65,7 +65,7 @@ * @param connection * The client connection. */ AbstractResultFutureImpl(int messageID, AbstractLDAPFutureResultImpl(int messageID, ResultHandler<? super S> handler, LDAPConnection connection) { super(handler); @@ -78,7 +78,8 @@ /** * {@inheritDoc} */ protected final ErrorResultException handleCancelRequest() protected final ErrorResultException handleCancelRequest( boolean mayInterruptIfRunning) { connection.abandon(Requests.newAbandonRequest(messageID)); return null; opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPBindFutureResultImpl.java
File was renamed from opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/BindResultFutureImpl.java @@ -42,8 +42,8 @@ /** * Bind result future implementation. */ final class BindResultFutureImpl extends AbstractResultFutureImpl<BindResult> implements final class LDAPBindFutureResultImpl extends AbstractLDAPFutureResultImpl<BindResult> implements FutureResult<BindResult> { private final BindRequest request; @@ -52,7 +52,7 @@ BindResultFutureImpl(int messageID, BindRequest request, LDAPBindFutureResultImpl(int messageID, BindRequest request, ResultHandler<? super BindResult> handler, LDAPConnection connection) { opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPCompareFutureResultImpl.java
File was renamed from opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/CompareResultFutureImpl.java @@ -41,15 +41,15 @@ /** * Compare result future implementation. */ final class CompareResultFutureImpl extends AbstractResultFutureImpl<CompareResult> implements final class LDAPCompareFutureResultImpl extends AbstractLDAPFutureResultImpl<CompareResult> implements FutureResult<CompareResult> { private final CompareRequest request; CompareResultFutureImpl(int messageID, CompareRequest request, LDAPCompareFutureResultImpl(int messageID, CompareRequest request, ResultHandler<? super CompareResult> handler, LDAPConnection connection) { opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
@@ -83,13 +83,13 @@ @Override public void handleAddResult(int messageID, Result result) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .remove(messageID); if (pendingRequest != null) { if (pendingRequest instanceof ResultFutureImpl) if (pendingRequest instanceof LDAPFutureResultImpl) { ResultFutureImpl future = (ResultFutureImpl) pendingRequest; LDAPFutureResultImpl future = (LDAPFutureResultImpl) pendingRequest; if (future.getRequest() instanceof AddRequest) { future.setResultOrError(result); @@ -108,13 +108,13 @@ @Override public void handleBindResult(int messageID, BindResult result) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .remove(messageID); if (pendingRequest != null) { if (pendingRequest instanceof BindResultFutureImpl) if (pendingRequest instanceof LDAPBindFutureResultImpl) { BindResultFutureImpl future = ((BindResultFutureImpl) pendingRequest); LDAPBindFutureResultImpl future = ((LDAPBindFutureResultImpl) pendingRequest); BindRequest request = future.getRequest(); if (request instanceof SASLBindRequest<?>) @@ -215,13 +215,13 @@ @Override public void handleCompareResult(int messageID, CompareResult result) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .remove(messageID); if (pendingRequest != null) { if (pendingRequest instanceof CompareResultFutureImpl) if (pendingRequest instanceof LDAPCompareFutureResultImpl) { CompareResultFutureImpl future = (CompareResultFutureImpl) pendingRequest; LDAPCompareFutureResultImpl future = (LDAPCompareFutureResultImpl) pendingRequest; future.setResultOrError(result); } else @@ -239,13 +239,13 @@ @Override public void handleDeleteResult(int messageID, Result result) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .remove(messageID); if (pendingRequest != null) { if (pendingRequest instanceof ResultFutureImpl) if (pendingRequest instanceof LDAPFutureResultImpl) { ResultFutureImpl future = (ResultFutureImpl) pendingRequest; LDAPFutureResultImpl future = (LDAPFutureResultImpl) pendingRequest; if (future.getRequest() instanceof DeleteRequest) { future.setResultOrError(result); @@ -322,12 +322,12 @@ } } AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .remove(messageID); if (pendingRequest instanceof ExtendedResultFutureImpl<?>) if (pendingRequest instanceof LDAPExtendedFutureResultImpl<?>) { ExtendedResultFutureImpl<?> extendedFuture = ((ExtendedResultFutureImpl<?>) pendingRequest); LDAPExtendedFutureResultImpl<?> extendedFuture = ((LDAPExtendedFutureResultImpl<?>) pendingRequest); try { handleExtendedResult0(extendedFuture, result); @@ -357,7 +357,7 @@ public void handleIntermediateResponse(int messageID, GenericIntermediateResponse response) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .remove(messageID); if (pendingRequest != null) { @@ -365,10 +365,10 @@ // FIXME: intermediate responses can occur for all operations. // if (pendingRequest instanceof ExtendedResultFutureImpl) // if (pendingRequest instanceof LDAPExtendedFutureResultImpl) // { // ExtendedResultFutureImpl extendedFuture = // ((ExtendedResultFutureImpl) pendingRequest); // LDAPExtendedFutureResultImpl extendedFuture = // ((LDAPExtendedFutureResultImpl) pendingRequest); // ExtendedRequest request = extendedFuture.getRequest(); // // try @@ -400,13 +400,13 @@ @Override public void handleModifyDNResult(int messageID, Result result) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .remove(messageID); if (pendingRequest != null) { if (pendingRequest instanceof ResultFutureImpl) if (pendingRequest instanceof LDAPFutureResultImpl) { ResultFutureImpl future = (ResultFutureImpl) pendingRequest; LDAPFutureResultImpl future = (LDAPFutureResultImpl) pendingRequest; if (future.getRequest() instanceof ModifyDNRequest) { future.setResultOrError(result); @@ -425,13 +425,13 @@ @Override public void handleModifyResult(int messageID, Result result) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .remove(messageID); if (pendingRequest != null) { if (pendingRequest instanceof ResultFutureImpl) if (pendingRequest instanceof LDAPFutureResultImpl) { ResultFutureImpl future = (ResultFutureImpl) pendingRequest; LDAPFutureResultImpl future = (LDAPFutureResultImpl) pendingRequest; if (future.getRequest() instanceof ModifyRequest) { future.setResultOrError(result); @@ -450,13 +450,13 @@ @Override public void handleSearchResult(int messageID, Result result) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .remove(messageID); if (pendingRequest != null) { if (pendingRequest instanceof SearchResultFutureImpl) if (pendingRequest instanceof LDAPSearchFutureResultImpl) { ((SearchResultFutureImpl) pendingRequest) ((LDAPSearchFutureResultImpl) pendingRequest) .setResultOrError(result); } else @@ -475,13 +475,13 @@ public void handleSearchResultEntry(int messageID, SearchResultEntry entry) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .get(messageID); if (pendingRequest != null) { if (pendingRequest instanceof SearchResultFutureImpl) if (pendingRequest instanceof LDAPSearchFutureResultImpl) { ((SearchResultFutureImpl) pendingRequest) ((LDAPSearchFutureResultImpl) pendingRequest) .handleSearchResultEntry(entry); } else @@ -500,13 +500,13 @@ public void handleSearchResultReference(int messageID, SearchResultReference reference) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .get(messageID); if (pendingRequest != null) { if (pendingRequest instanceof SearchResultFutureImpl) if (pendingRequest instanceof LDAPSearchFutureResultImpl) { ((SearchResultFutureImpl) pendingRequest) ((LDAPSearchFutureResultImpl) pendingRequest) .handleSearchResultReference(reference); } else @@ -658,7 +658,7 @@ private volatile int pendingBindOrStartTLS = -1; private final ConcurrentHashMap<Integer, AbstractResultFutureImpl<?>> pendingRequests = new ConcurrentHashMap<Integer, AbstractResultFutureImpl<?>>(); private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests = new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>(); private final InetSocketAddress serverAddress; @@ -697,7 +697,7 @@ */ public void abandon(AbandonRequest request) { AbstractResultFutureImpl<?> pendingRequest = pendingRequests AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests .remove(request.getMessageID()); if (pendingRequest != null) { @@ -751,7 +751,7 @@ ResultHandler<Result> handler) { int messageID = nextMsgID.getAndIncrement(); ResultFutureImpl future = new ResultFutureImpl(messageID, request, LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request, handler, this); ASN1StreamWriter asn1Writer = connFactory .getASN1Writer(streamWriter); @@ -830,7 +830,7 @@ ResultHandler<? super BindResult> handler) { int messageID = nextMsgID.getAndIncrement(); BindResultFutureImpl future = new BindResultFutureImpl(messageID, LDAPBindFutureResultImpl future = new LDAPBindFutureResultImpl(messageID, request, handler, this); ASN1StreamWriter asn1Writer = connFactory .getASN1Writer(streamWriter); @@ -949,7 +949,7 @@ ResultHandler<? super CompareResult> handler) { int messageID = nextMsgID.getAndIncrement(); CompareResultFutureImpl future = new CompareResultFutureImpl( LDAPCompareFutureResultImpl future = new LDAPCompareFutureResultImpl( messageID, request, handler, this); ASN1StreamWriter asn1Writer = connFactory .getASN1Writer(streamWriter); @@ -1007,7 +1007,7 @@ ResultHandler<Result> handler) { int messageID = nextMsgID.getAndIncrement(); ResultFutureImpl future = new ResultFutureImpl(messageID, request, LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request, handler, this); ASN1StreamWriter asn1Writer = connFactory .getASN1Writer(streamWriter); @@ -1065,7 +1065,7 @@ ExtendedRequest<R> request, ResultHandler<? super R> handler) { int messageID = nextMsgID.getAndIncrement(); ExtendedResultFutureImpl<R> future = new ExtendedResultFutureImpl<R>( LDAPExtendedFutureResultImpl<R> future = new LDAPExtendedFutureResultImpl<R>( messageID, request, handler, this); ASN1StreamWriter asn1Writer = connFactory .getASN1Writer(streamWriter); @@ -1142,7 +1142,7 @@ ResultHandler<Result> handler) { int messageID = nextMsgID.getAndIncrement(); ResultFutureImpl future = new ResultFutureImpl(messageID, request, LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request, handler, this); ASN1StreamWriter asn1Writer = connFactory .getASN1Writer(streamWriter); @@ -1200,7 +1200,7 @@ ResultHandler<Result> handler) { int messageID = nextMsgID.getAndIncrement(); ResultFutureImpl future = new ResultFutureImpl(messageID, request, LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request, handler, this); ASN1StreamWriter asn1Writer = connFactory .getASN1Writer(streamWriter); @@ -1275,7 +1275,7 @@ SearchResultHandler searchResulthandler) { int messageID = nextMsgID.getAndIncrement(); SearchResultFutureImpl future = new SearchResultFutureImpl( LDAPSearchFutureResultImpl future = new LDAPSearchFutureResultImpl( messageID, request, resultHandler, searchResulthandler, this); ASN1StreamWriter asn1Writer = connFactory .getASN1Writer(streamWriter); @@ -1393,7 +1393,7 @@ } // First abort all outstanding requests. for (AbstractResultFutureImpl<?> future : pendingRequests for (AbstractLDAPFutureResultImpl<?> future : pendingRequests .values()) { if (pendingBindOrStartTLS <= 0) @@ -1561,7 +1561,7 @@ // Needed in order to expose type information. private <R extends Result> void handleExtendedResult0( ExtendedResultFutureImpl<R> future, GenericExtendedResult result) LDAPExtendedFutureResultImpl<R> future, GenericExtendedResult result) throws DecodeException { R decodedResponse = future.decodeResponse(result.getResultCode(), @@ -1592,7 +1592,7 @@ private void handleIncorrectResponse( AbstractResultFutureImpl<?> pendingRequest) AbstractLDAPFutureResultImpl<?> pendingRequest) { // FIXME: I18N need to have a better error message. Result errorResult = Responses.newResult( opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
@@ -33,17 +33,20 @@ import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import javax.net.ssl.SSLContext; import org.opends.sdk.*; import org.opends.sdk.ldap.LDAPConnectionOptions; import org.opends.sdk.controls.*; import org.opends.sdk.extensions.StartTLSRequest; import org.opends.sdk.ldap.LDAPConnectionOptions; import org.opends.sdk.responses.Responses; import org.opends.sdk.responses.Result; import com.sun.grizzly.CompletionHandler; import com.sun.grizzly.Connection; import com.sun.grizzly.TransportFactory; import com.sun.grizzly.attributes.Attribute; import com.sun.grizzly.filterchain.PatternFilterChainFactory; @@ -53,6 +56,9 @@ import com.sun.grizzly.ssl.SSLFilter; import com.sun.grizzly.ssl.SSLHandshaker; import com.sun.grizzly.streams.StreamWriter; import com.sun.opends.sdk.util.CompletedFutureResult; import com.sun.opends.sdk.util.FutureResultTransformer; import com.sun.opends.sdk.util.RecursiveFutureResult; import com.sun.opends.sdk.util.Validator; @@ -69,7 +75,7 @@ @Override protected LDAPMessageHandler getMessageHandler( com.sun.grizzly.Connection<?> connection) Connection<?> connection) { return ldapConnectionAttr.get(connection).getLDAPMessageHandler(); } @@ -77,8 +83,7 @@ @Override protected void removeMessageHandler( com.sun.grizzly.Connection<?> connection) protected void removeMessageHandler(Connection<?> connection) { ldapConnectionAttr.remove(connection); } @@ -87,226 +92,96 @@ private static class FailedImpl implements FutureResult<AsynchronousConnection> private final class FutureResultImpl implements CompletionHandler<Connection> { private volatile ErrorResultException exception; private final FutureResultTransformer<Result, AsynchronousConnection> futureStartTLSResult; private final RecursiveFutureResult<LDAPConnection, Result> futureConnectionResult; private LDAPConnection connection; private FailedImpl(ErrorResultException exception) { this.exception = exception; } public boolean cancel(boolean mayInterruptIfRunning) { return false; } public AsynchronousConnection get() throws InterruptedException, ErrorResultException { throw exception; } public AsynchronousConnection get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException { throw exception; } public boolean isCancelled() { return false; } public boolean isDone() { return false; } public int getRequestID() { return -1; } } private class ResultFutureImpl implements FutureResult<AsynchronousConnection>, com.sun.grizzly.CompletionHandler<com.sun.grizzly.Connection>, ResultHandler<Result> { private volatile AsynchronousConnection connection; private volatile ErrorResultException exception; private volatile Future<com.sun.grizzly.Connection> connectFuture; private volatile FutureResult<?> sslFuture; private final CountDownLatch latch = new CountDownLatch(1); private final ResultHandler<? super AsynchronousConnection> handler; private boolean cancelled; private ResultFutureImpl( private FutureResultImpl( ResultHandler<? super AsynchronousConnection> handler) { this.handler = handler; this.futureStartTLSResult = new FutureResultTransformer<Result, AsynchronousConnection>( handler) { protected ErrorResultException transformErrorResult( ErrorResultException errorResult) { // Ensure that the connection is closed. try { connection.close(); } catch (Exception e) { // Ignore. } return errorResult; } public boolean cancel(boolean mayInterruptIfRunning) protected LDAPConnection transformResult(Result result) throws ErrorResultException { cancelled = connectFuture.cancel(mayInterruptIfRunning) || sslFuture != null && sslFuture.cancel(mayInterruptIfRunning); if (cancelled) { latch.countDown(); } return cancelled; } public AsynchronousConnection get() throws InterruptedException, ErrorResultException { latch.await(); if (cancelled) { throw new CancellationException(); } if (exception != null) { throw exception; } return connection; } }; public AsynchronousConnection get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException this.futureConnectionResult = new RecursiveFutureResult<LDAPConnection, Result>( futureStartTLSResult) { latch.await(timeout, unit); if (cancelled) protected FutureResult<? extends Result> chainResult( LDAPConnection innerResult, ResultHandler<? super Result> handler) throws ErrorResultException { throw new CancellationException(); } if (exception != null) { throw exception; } return connection; } public boolean isCancelled() { return cancelled; } public boolean isDone() { return latch.getCount() == 0; } public int getRequestID() { return -1; } /** * {@inheritDoc} */ public void cancelled(com.sun.grizzly.Connection connection) { // Ignore this. } /** * {@inheritDoc} */ public void completed(com.sun.grizzly.Connection connection, com.sun.grizzly.Connection result) { LDAPConnection ldapConn = adaptConnection(connection); this.connection = adaptConnection(connection); connection = innerResult; if (options.getSSLContext() != null && options.useStartTLS()) { StartTLSRequest startTLS = new StartTLSRequest(options .getSSLContext()); sslFuture = this.connection.extendedRequest(startTLS, this); return connection.extendedRequest(startTLS, handler); } else if (options.getSSLContext() != null) if (options.getSSLContext() != null) { try { ldapConn.installFilter(sslFilter); ldapConn.performSSLHandshake(sslHandshaker, connection.installFilter(sslFilter); connection.performSSLHandshake(sslHandshaker, sslEngineConfigurator); latch.countDown(); if (handler != null) } catch (ErrorResultException errorResult) { handler.handleResult(this.connection); } } catch (CancellationException ce) try { // Handshake cancelled. latch.countDown(); connection.close(); connection = null; } catch (ErrorResultException throwable) catch (Exception ignored) { exception = throwable; latch.countDown(); if (handler != null) { handler.handleErrorResult(exception); } throw errorResult; } } handler.handleResult(null); return new CompletedFutureResult<Result>((Result) null); } else { latch.countDown(); if (handler != null) { handler.handleResult(this.connection); } } }; futureStartTLSResult.setFutureResult(futureConnectionResult); } @@ -314,52 +189,42 @@ /** * {@inheritDoc} */ public void failed(com.sun.grizzly.Connection connection, Throwable throwable) { exception = adaptConnectionException(throwable); latch.countDown(); if (handler != null) { handler.handleErrorResult(exception); } } /** * {@inheritDoc} */ public void updated(com.sun.grizzly.Connection connection, com.sun.grizzly.Connection result) public void cancelled(Connection connection) { // Ignore this. } // This is called when the StartTLS request is successful public void handleResult(Result result) /** * {@inheritDoc} */ public void completed(Connection connection, Connection result) { latch.countDown(); if (handler != null) { handler.handleResult(connection); } futureConnectionResult.handleResult(adaptConnection(connection)); } // This is called when the StartTLS request is not successful public void handleErrorResult(ErrorResultException error) /** * {@inheritDoc} */ public void failed(Connection connection, Throwable throwable) { exception = error; latch.countDown(); if (handler != null) futureConnectionResult .handleErrorResult(adaptConnectionException(throwable)); } /** * {@inheritDoc} */ public void updated(Connection connection, Connection result) { handler.handleErrorResult(exception); // Ignore this. } } } @@ -510,17 +375,18 @@ public FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AsynchronousConnection> handler) { ResultFutureImpl future = new ResultFutureImpl(handler); FutureResultImpl future = new FutureResultImpl(handler); try { future.connectFuture = transport.connect(socketAddress, future); return future; future.futureConnectionResult.setFutureResult(transport.connect( socketAddress, future)); return future.futureStartTLSResult; } catch (IOException e) { ErrorResultException result = adaptConnectionException(e); return new FailedImpl(result); return new CompletedFutureResult<AsynchronousConnection>(result); } } @@ -583,8 +449,7 @@ private LDAPConnection adaptConnection( com.sun.grizzly.Connection<?> connection) private LDAPConnection adaptConnection(Connection<?> connection) { // Test shows that its much faster with non block writes but risk // running out of memory if the server is slow. opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPExtendedFutureResultImpl.java
File was renamed from opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/ExtendedResultFutureImpl.java @@ -38,14 +38,14 @@ /** * Extended result future implementation. */ final class ExtendedResultFutureImpl<R extends Result> extends AbstractResultFutureImpl<R> implements FutureResult<R> final class LDAPExtendedFutureResultImpl<R extends Result> extends AbstractLDAPFutureResultImpl<R> implements FutureResult<R> { private final ExtendedRequest<R> request; ExtendedResultFutureImpl(int messageID, ExtendedRequest<R> request, LDAPExtendedFutureResultImpl(int messageID, ExtendedRequest<R> request, ResultHandler<? super R> handler, LDAPConnection connection) { super(messageID, handler, connection); opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPFutureResultImpl.java
File was renamed from opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/ResultFutureImpl.java @@ -41,14 +41,14 @@ /** * Result future implementation. */ public final class ResultFutureImpl extends AbstractResultFutureImpl<Result> implements FutureResult<Result> public final class LDAPFutureResultImpl extends AbstractLDAPFutureResultImpl<Result> implements FutureResult<Result> { private final Request request; ResultFutureImpl(int messageID, Request request, LDAPFutureResultImpl(int messageID, Request request, ResultHandler<Result> handler, LDAPConnection connection) { super(messageID, handler, connection); opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPSearchFutureResultImpl.java
File was renamed from opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/SearchResultFutureImpl.java @@ -44,8 +44,8 @@ /** * Search result future implementation. */ final class SearchResultFutureImpl extends AbstractResultFutureImpl<Result> implements FutureResult<Result> final class LDAPSearchFutureResultImpl extends AbstractLDAPFutureResultImpl<Result> implements FutureResult<Result> { private final SearchResultHandler searchResultHandler; @@ -54,7 +54,7 @@ SearchResultFutureImpl(int messageID, SearchRequest request, LDAPSearchFutureResultImpl(int messageID, SearchRequest request, ResultHandler<Result> resultHandler, SearchResultHandler searchResultHandler, LDAPConnection connection) { opendj-sdk/sdk/src/com/sun/opends/sdk/util/CompletedFutureResult.java
New file @@ -0,0 +1,179 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package com.sun.opends.sdk.util; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opends.sdk.ErrorResultException; import org.opends.sdk.FutureResult; /** * An implementation of {@code FutureResult} which can be used in cases * where the result is known in advance, for example, if the result is * obtained synchronously. * * @param <S> * The type of result returned by this future. */ public final class CompletedFutureResult<S> implements FutureResult<S> { private final S result; private final ErrorResultException errorResult; private final int requestID; /** * Creates a new completed future which will return the provided * result and request ID of {@code -1}. * * @param result * The result, which may be {@code null}. */ public CompletedFutureResult(S result) { this(result, -1); } /** * Creates a new completed future which will throw the provided error * result and request ID of {@code -1}. * * @param errorResult * The error result. * @throws NullPointerException * If {@code errorResult} was {@code null}. */ public CompletedFutureResult(ErrorResultException errorResult) throws NullPointerException { this(errorResult, -1); } /** * Creates a new completed future which will return the provided * result and request ID. * * @param result * The result, which may be {@code null}. * @param requestID * The request ID. */ public CompletedFutureResult(S result, int requestID) { this.result = result; this.errorResult = null; this.requestID = requestID; } /** * Creates a new completed future which will throw the provided error * result and request ID. * * @param errorResult * The error result. * @param requestID * The request ID. * @throws NullPointerException * If {@code errorResult} was {@code null}. */ public CompletedFutureResult(ErrorResultException errorResult, int requestID) throws NullPointerException { Validator.ensureNotNull(errorResult); this.result = null; this.errorResult = errorResult; this.requestID = requestID; } public boolean cancel(boolean mayInterruptIfRunning) { return false; } public S get() throws ErrorResultException, InterruptedException { if (errorResult == null) { // May be null. return result; } else { throw errorResult; } } public S get(long timeout, TimeUnit unit) throws ErrorResultException, TimeoutException, InterruptedException { return get(); } public int getRequestID() { return requestID; } public boolean isCancelled() { return false; } public boolean isDone() { return true; } } opendj-sdk/sdk/src/com/sun/opends/sdk/util/FutureResultTransformer.java
@@ -55,7 +55,7 @@ private final ResultHandler<? super N> handler; private volatile FutureResult<M> future = null; private volatile FutureResult<? extends M> future = null; // These do not need to be volatile since the future acts as a memory // barrier. @@ -194,7 +194,7 @@ * @param future * The inner future. */ public final void setFutureResult(FutureResult<M> future) public final void setFutureResult(FutureResult<? extends M> future) { this.future = future; } opendj-sdk/sdk/src/com/sun/opends/sdk/util/RecursiveFutureResult.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -64,7 +65,15 @@ public int getRequestID() { return innerFuture.getRequestID(); if (innerFuture instanceof FutureResult<?>) { FutureResult<?> tmp = (FutureResult<?>) innerFuture; return tmp.getRequestID(); } else { return -1; } } @@ -89,11 +98,11 @@ private final FutureResultImpl impl; private volatile FutureResult<M> innerFuture = null; private volatile Future<?> innerFuture = null; // This does not need to be volatile since the inner future acts as a // memory barrier. private FutureResult<N> outerFuture = null; private FutureResult<? extends N> outerFuture = null; @@ -215,7 +224,7 @@ * @param future * The inner future. */ public final void setFutureResult(FutureResult<M> future) public final void setFutureResult(Future<?> future) { this.innerFuture = future; } @@ -239,7 +248,7 @@ * If the outer request could not be invoked and processing * should terminate. */ protected FutureResult<N> chainErrorResult( protected FutureResult<? extends N> chainErrorResult( ErrorResultException innerError, ResultHandler<? super N> handler) throws ErrorResultException { @@ -261,7 +270,8 @@ * If the outer request could not be invoked and processing * should terminate. */ protected abstract FutureResult<N> chainResult(M innerResult, ResultHandler<? super N> handler) throws ErrorResultException; protected abstract FutureResult<? extends N> chainResult( M innerResult, ResultHandler<? super N> handler) throws ErrorResultException; } opendj-sdk/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
@@ -30,10 +30,6 @@ import java.util.Collection; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opends.sdk.requests.*; import org.opends.sdk.responses.BindResult; @@ -42,6 +38,8 @@ import org.opends.sdk.responses.SearchResultEntry; import org.opends.sdk.schema.Schema; import com.sun.opends.sdk.util.FutureResultTransformer; import com.sun.opends.sdk.util.RecursiveFutureResult; import com.sun.opends.sdk.util.Validator; @@ -108,13 +106,10 @@ public FutureResult<AuthenticatedAsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AuthenticatedAsynchronousConnection> handler) { // TODO: bug here? if allowRebind= false then bind will never // happen ResultFutureImpl future = new ResultFutureImpl( allowRebinds ? request : null, handler); future.connectFuture = parentFactory .getAsynchronousConnection(future.connectionHandler); return future; FutureResultImpl future = new FutureResultImpl(request, handler); future.futureConnectionResult.setFutureResult(parentFactory .getAsynchronousConnection(future.futureConnectionResult)); return future.futureBindResult; } @@ -560,169 +555,68 @@ private static final class ResultFutureImpl implements FutureResult<AuthenticatedAsynchronousConnection> private static final class FutureResultImpl { private final class ConnectionResultHandler implements ResultHandler<AsynchronousConnection> private final FutureResultTransformer<BindResult, AuthenticatedAsynchronousConnection> futureBindResult; private final RecursiveFutureResult<AsynchronousConnection, BindResult> futureConnectionResult; private final BindRequest bindRequest; private AsynchronousConnection connection; private FutureResultImpl( BindRequest request, ResultHandler<? super AuthenticatedAsynchronousConnection> handler) { public void handleResult(AsynchronousConnection conn) this.bindRequest = request; this.futureBindResult = new FutureResultTransformer<BindResult, AuthenticatedAsynchronousConnection>( handler) { connection = conn; bindFuture = connection.bind(request, bindHandler); } public void handleErrorResult(ErrorResultException error) { exception = error; latch.countDown(); } } private final class BindRequestResultHandler implements ResultHandler<BindResult> { public void handleResult(BindResult result) { // FIXME: should make the result unmodifiable. authenticatedConnection = new AuthenticatedAsynchronousConnection( connection, request, result); latch.countDown(); if (handler != null) { handler.handleResult(authenticatedConnection); } } public void handleErrorResult(ErrorResultException error) protected ErrorResultException transformErrorResult( ErrorResultException errorResult) { // Ensure that the connection is closed. try { connection.close(); connection = null; } catch (Exception e) { // Ignore. } return errorResult; } exception = error; latch.countDown(); if (handler != null) protected AuthenticatedAsynchronousConnection transformResult( BindResult result) throws ErrorResultException { handler.handleErrorResult(exception); } } // FIXME: should make the result unmodifiable. return new AuthenticatedAsynchronousConnection(connection, bindRequest, result); } private final ResultHandler<BindResult> bindHandler = new BindRequestResultHandler(); private final ResultHandler<AsynchronousConnection> connectionHandler = new ConnectionResultHandler(); private volatile AuthenticatedAsynchronousConnection authenticatedConnection; private volatile AsynchronousConnection connection; private volatile ErrorResultException exception; private volatile FutureResult<?> connectFuture; private volatile FutureResult<BindResult> bindFuture; private final CountDownLatch latch = new CountDownLatch(1); private final ResultHandler<? super AuthenticatedAsynchronousConnection> handler; private boolean cancelled; private final BindRequest request; private ResultFutureImpl( BindRequest request, ResultHandler<? super AuthenticatedAsynchronousConnection> handler) }; this.futureConnectionResult = new RecursiveFutureResult<AsynchronousConnection, BindResult>( futureBindResult) { this.request = request; this.handler = handler; } public boolean cancel(boolean mayInterruptIfRunning) protected FutureResult<? extends BindResult> chainResult( AsynchronousConnection innerResult, ResultHandler<? super BindResult> handler) throws ErrorResultException { cancelled = connectFuture.cancel(mayInterruptIfRunning) || bindFuture != null && bindFuture.cancel(mayInterruptIfRunning); if (cancelled) { latch.countDown(); connection = innerResult; return connection.bind(bindRequest, handler); } return cancelled; } public AuthenticatedAsynchronousConnection get() throws InterruptedException, ErrorResultException { latch.await(); if (cancelled) { throw new CancellationException(); } if (exception != null) { throw exception; } return authenticatedConnection; } public AuthenticatedAsynchronousConnection get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException { latch.await(timeout, unit); if (cancelled) { throw new CancellationException(); } if (exception != null) { throw exception; } return authenticatedConnection; } public boolean isCancelled() { return cancelled; } public boolean isDone() { return latch.getCount() == 0; } public int getRequestID() { return -1; }; futureBindResult.setFutureResult(futureConnectionResult); } } opendj-sdk/sdk/src/org/opends/sdk/ConnectionFactory.java
@@ -73,9 +73,9 @@ * Initiates an asynchronous connection request to the Directory * Server associated with this connection factory. The returned * {@code FutureResult} can be used to retrieve the completed * asynchronous connection. Alternatively, if a {@code * ResultHandler} is provided, the handler will be notified * when the connection is available and ready for use. * asynchronous connection. Alternatively, if a {@code ResultHandler} * is provided, the handler will be notified when the connection is * available and ready for use. * * @param handler * The completion handler, or {@code null} if no handler is opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -32,15 +32,15 @@ import java.util.Collection; import java.util.Stack; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; import org.opends.sdk.requests.*; import org.opends.sdk.responses.*; import org.opends.sdk.schema.Schema; import com.sun.opends.sdk.util.AbstractFutureResult; import com.sun.opends.sdk.util.CompletedFutureResult; import com.sun.opends.sdk.util.FutureResultTransformer; import com.sun.opends.sdk.util.StaticUtils; @@ -48,7 +48,7 @@ /** * A simple connection pool implementation. */ public class ConnectionPool extends public final class ConnectionPool extends AbstractConnectionFactory<AsynchronousConnection> { private final ConnectionFactory<?> connectionFactory; @@ -60,12 +60,33 @@ // FIXME: should use a better collection than this - CLQ? private final Stack<AsynchronousConnection> pool; private final ConcurrentLinkedQueue<PendingResultFuture> pendingFutures; private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures; private final Object lock = new Object(); private final class FutureNewConnection extends FutureResultTransformer<AsynchronousConnection, AsynchronousConnection> { private FutureNewConnection( ResultHandler<? super AsynchronousConnection> handler) { super(handler); } protected AsynchronousConnection transformResult( AsynchronousConnection result) throws ErrorResultException { return new PooledConnectionWapper(result); } } private final class PooledConnectionWapper implements AsynchronousConnection, ConnectionEventListener { @@ -144,13 +165,25 @@ return; } // See if there waiters pending PendingResultFuture future = pendingFutures.poll(); if (future != null) // See if there waiters pending. for (;;) { FuturePooledConnection future = pendingFutures.poll(); if (future == null) { // No waiters - so drop out and add connection to pool. break; } PooledConnectionWapper pooledConnection = new PooledConnectionWapper( connection); future.connection(pooledConnection); future.handleResult(pooledConnection); if (!future.isCancelled()) { // The future was not cancelled and the connection was // accepted. if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG @@ -163,6 +196,7 @@ } return; } } // No waiters. Put back in pool. pool.push(connection); @@ -438,165 +472,26 @@ private static final class CompletedResultFuture implements FutureResult<AsynchronousConnection> // Future used for waiting for pooled connections to become available. private static final class FuturePooledConnection extends AbstractFutureResult<AsynchronousConnection> { private final PooledConnectionWapper connection; private CompletedResultFuture(PooledConnectionWapper connection) { this.connection = connection; } public boolean cancel(boolean mayInterruptIfRunning) { return false; } public AsynchronousConnection get() throws InterruptedException, ErrorResultException { return connection; } public AsynchronousConnection get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException { return connection; } public boolean isCancelled() { return false; } public boolean isDone() { return true; } public int getRequestID() { return -1; } } private final class PendingResultFuture implements FutureResult<AsynchronousConnection> { private volatile boolean isCancelled; private volatile PooledConnectionWapper connection; private volatile ErrorResultException err; private final ResultHandler<? super AsynchronousConnection> handler; private final CountDownLatch latch = new CountDownLatch(1); private PendingResultFuture( private FuturePooledConnection( ResultHandler<? super AsynchronousConnection> handler) { this.handler = handler; super(handler); } public synchronized boolean cancel(boolean mayInterruptIfRunning) { return pendingFutures.remove(this) && (isCancelled = true); } public AsynchronousConnection get() throws InterruptedException, ErrorResultException { latch.await(); if (err != null) { throw err; } return connection; } public AsynchronousConnection get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException { latch.await(timeout, unit); if (err != null) { throw err; } return connection; } public synchronized boolean isCancelled() { return isCancelled; } public boolean isDone() { return latch.getCount() == 0; } /** * {@inheritDoc} */ public int getRequestID() { return -1; } private void connection(PooledConnectionWapper connection) { this.connection = connection; if (handler != null) { handler.handleResult(connection); } latch.countDown(); } private void error(ErrorResultException e) { this.err = e; if (handler != null) { handler.handleErrorResult(e); } latch.countDown(); } } @@ -617,38 +512,7 @@ this.connectionFactory = connectionFactory; this.poolSize = poolSize; this.pool = new Stack<AsynchronousConnection>(); this.pendingFutures = new ConcurrentLinkedQueue<PendingResultFuture>(); } private final class WrapResultHandler implements ResultHandler<AsynchronousConnection> { private final PendingResultFuture future; private WrapResultHandler(PendingResultFuture future) { this.future = future; } public void handleResult(AsynchronousConnection connection) { PooledConnectionWapper pooledConnection = new PooledConnectionWapper( connection); future.connection(pooledConnection); } public void handleErrorResult(ErrorResultException error) { future.error(error); } this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>(); } @@ -679,19 +543,21 @@ { handler.handleResult(pooledConnection); } return new CompletedResultFuture(pooledConnection); return new CompletedFutureResult<AsynchronousConnection>( pooledConnection); } PendingResultFuture pendingFuture = new PendingResultFuture( handler); // Pool was empty. Maybe a new connection if pool size is not // reached if (numConnections < poolSize) { // We can create a new connection. numConnections++; WrapResultHandler wrapHandler = new WrapResultHandler( pendingFuture); connectionFactory.getAsynchronousConnection(wrapHandler); FutureNewConnection future = new FutureNewConnection(handler); future.setFutureResult(connectionFactory .getAsynchronousConnection(future)); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG @@ -702,11 +568,16 @@ numConnections, pool.size(), pendingFutures .size())); } return future; } else { // Have to wait pendingFutures.add(pendingFuture); // Pool is full so wait for a connection to become available. FuturePooledConnection future = new FuturePooledConnection( handler); pendingFutures.add(future); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG @@ -717,9 +588,9 @@ numConnections, pool.size(), pendingFutures .size())); } } return pendingFuture; return future; } } } } opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,15 +32,12 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opends.sdk.requests.*; import org.opends.sdk.responses.*; import org.opends.sdk.schema.Schema; import com.sun.opends.sdk.util.FutureResultTransformer; import com.sun.opends.sdk.util.Validator; @@ -437,126 +434,37 @@ private final class ResultFutureImpl implements FutureResult<AsynchronousConnection>, private final class FutureResultImpl extends FutureResultTransformer<AsynchronousConnection, AsynchronousConnection> implements FutureResult<AsynchronousConnection>, ResultHandler<AsynchronousConnection> { private volatile AsynchronousConnectionImpl heartBeatConnection; private volatile ErrorResultException exception; private volatile FutureResult<?> connectFuture; private final CountDownLatch latch = new CountDownLatch(1); private final ResultHandler<? super AsynchronousConnectionImpl> handler; private boolean cancelled; private ResultFutureImpl( ResultHandler<? super AsynchronousConnectionImpl> handler) private FutureResultImpl( ResultHandler<? super AsynchronousConnection> handler) { this.handler = handler; super(handler); } public boolean cancel(boolean mayInterruptIfRunning) /** * {@inheritDoc} */ protected AsynchronousConnection transformResult( AsynchronousConnection connection) throws ErrorResultException { cancelled = connectFuture.cancel(mayInterruptIfRunning); if (cancelled) { latch.countDown(); } return cancelled; } public AsynchronousConnectionImpl get() throws InterruptedException, ErrorResultException { latch.await(); if (cancelled) { throw new CancellationException(); } if (exception != null) { throw exception; } return heartBeatConnection; } public AsynchronousConnectionImpl get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ErrorResultException { latch.await(timeout, unit); if (cancelled) { throw new CancellationException(); } if (exception != null) { throw exception; } return heartBeatConnection; } public boolean isCancelled() { return cancelled; } public boolean isDone() { return latch.getCount() == 0; } public int getRequestID() { return -1; } public void handleResult(AsynchronousConnection connection) { heartBeatConnection = new AsynchronousConnectionImpl(connection); AsynchronousConnectionImpl heartBeatConnection = new AsynchronousConnectionImpl( connection); synchronized (activeConnections) { connection.addConnectionEventListener(heartBeatConnection); activeConnections.add(heartBeatConnection); } if (handler != null) { handler.handleResult(heartBeatConnection); } latch.countDown(); return heartBeatConnection; } public void handleErrorResult(ErrorResultException error) { exception = error; if (handler != null) { handler.handleErrorResult(error); } latch.countDown(); } } @@ -564,9 +472,9 @@ public FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AsynchronousConnection> handler) { ResultFutureImpl future = new ResultFutureImpl(handler); future.connectFuture = parentFactory .getAsynchronousConnection(future); FutureResultImpl future = new FutureResultImpl(handler); future.setFutureResult(parentFactory .getAsynchronousConnection(future)); return future; } }