opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java
@@ -226,42 +226,43 @@ LDAP_CONNECTION_ATTR.get(ctx.getConnection()); if (ldapConnection != null) { if (messageID == 0) { // Unsolicited notification received. if ((result.getOID() != null) && result.getOID().equals(OID_NOTICE_OF_DISCONNECTION)) { // Treat this as a connection error. final Result errorResult = Responses .newResult(result.getResultCode()) .setDiagnosticMessage(result.getDiagnosticMessage()); ldapConnection.close(null, true, errorResult); return; } else { // Unsolicited notification received. ldapConnection.handleUnsolicitedNotification(result); } } } else { final AbstractLDAPFutureResultImpl<?> pendingRequest = ldapConnection.removePendingRequest(messageID); final AbstractLDAPFutureResultImpl<?> pendingRequest = ldapConnection.removePendingRequest(messageID); if (pendingRequest != null) { if (pendingRequest instanceof LDAPExtendedFutureResultImpl<?>) { final LDAPExtendedFutureResultImpl<?> extendedFuture = ((LDAPExtendedFutureResultImpl<?>) pendingRequest); try { handleExtendedResult0(ldapConnection, extendedFuture, result); } catch (final DecodeException de) { // FIXME: should the connection be closed as // well? final Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_DECODING_ERROR) .setDiagnosticMessage(de.getLocalizedMessage()) .setCause(de); extendedFuture.adaptErrorResult(errorResult); if (pendingRequest != null) { if (pendingRequest instanceof LDAPExtendedFutureResultImpl<?>) { final LDAPExtendedFutureResultImpl<?> extendedFuture = ((LDAPExtendedFutureResultImpl<?>) pendingRequest); try { handleExtendedResult0(ldapConnection, extendedFuture, result); } catch (final DecodeException de) { // FIXME: should the connection be closed as // well? final Result errorResult = Responses.newResult( ResultCode.CLIENT_SIDE_DECODING_ERROR) .setDiagnosticMessage( de.getLocalizedMessage()).setCause( de); extendedFuture.adaptErrorResult(errorResult); } } else { throw new UnexpectedResponseException(messageID, result); } } else { throw new UnexpectedResponseException(messageID, result); } } } opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -31,9 +31,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -83,22 +83,22 @@ /** * LDAP connection implementation. * <p> * TODO: handle illegal state exceptions. */ final class LDAPConnection extends AbstractAsynchronousConnection implements Connection { private final org.glassfish.grizzly.Connection<?> connection; private Result connectionInvalidReason; private boolean isClosed = false; private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); private final AtomicInteger nextMsgID = new AtomicInteger(1); private final AtomicBoolean bindOrStartTLSInProgress = new AtomicBoolean(false); private final org.glassfish.grizzly.Connection<?> connection; private final LDAPWriter ldapWriter = new LDAPWriter(); private final AtomicInteger nextMsgID = new AtomicInteger(1); private final LDAPOptions options; private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests = new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>(); private final Object stateLock = new Object(); private final LDAPWriter ldapWriter = new LDAPWriter(); private final LDAPOptions options; // Guarded by stateLock private Result connectionInvalidReason; private boolean failedDueToDisconnect = false; private boolean isClosed = false; private boolean isFailed = false; private List<ConnectionEventListener> listeners = null; /** * Creates a new LDAP connection. @@ -116,60 +116,47 @@ /** * {@inheritDoc} */ @Override public FutureResult<Void> abandonAsync(final AbandonRequest request) { final AbstractLDAPFutureResultImpl<?> pendingRequest; final int messageID = nextMsgID.getAndIncrement(); synchronized (stateLock) { if (connectionInvalidReason != null) { return new CompletedFutureResult<Void>(newErrorResult(connectionInvalidReason), messageID); } if (bindOrStartTLSInProgress.get()) { final Result errorResult = Responses.newResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage( "Bind or Start TLS operation in progress"); return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID); } // First remove the future associated with the request to be // abandoned. pendingRequest = pendingRequests.remove(request.getRequestID()); } if (pendingRequest == null) { // There has never been a request with the specified message ID or // the response has already been received and handled. We can ignore // this abandon request. // Message ID will be -1 since no request was sent. return new CompletedFutureResult<Void>((Void) null); } pendingRequest.cancel(false); try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.abandonRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); return new CompletedFutureResult<Void>((Void) null, messageID); } finally { asn1Writer.recycle(); synchronized (stateLock) { checkConnectionIsValid(); checkBindOrStartTLSInProgress(); // Remove the future associated with the request to be abandoned. pendingRequest = pendingRequests.remove(request.getRequestID()); } } catch (final IOException e) { // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID); if (pendingRequest == null) { // There has never been a request with the specified message ID or // the response has already been received and handled. We can ignore // this abandon request. // Message ID will be -1 since no request was sent. return new CompletedFutureResult<Void>((Void) null); } pendingRequest.cancel(false); try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.abandonRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); return new CompletedFutureResult<Void>((Void) null, messageID); } finally { asn1Writer.recycle(); } } catch (final IOException e) { throw adaptRequestIOException(e); } } catch (final ErrorResultException e) { return new CompletedFutureResult<Void>(e, messageID); } } /** * {@inheritDoc} */ @Override public FutureResult<Result> addAsync(final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super Result> resultHandler) { @@ -177,59 +164,67 @@ final LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request, resultHandler, intermediateResponseHandler, this); synchronized (stateLock) { if (connectionInvalidReason != null) { future.adaptErrorResult(connectionInvalidReason); return future; } if (bindOrStartTLSInProgress.get()) { future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR) .setDiagnosticMessage("Bind or Start TLS operation in progress")); return future; } pendingRequests.put(messageID, future); } try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.addRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); synchronized (stateLock) { checkConnectionIsValid(); checkBindOrStartTLSInProgress(); pendingRequests.put(messageID, future); } } catch (final IOException e) { pendingRequests.remove(messageID); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.adaptErrorResult(errorResult); try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.addRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); } } catch (final IOException e) { pendingRequests.remove(messageID); throw adaptRequestIOException(e); } } catch (final ErrorResultException e) { future.adaptErrorResult(e.getResult()); } return future; } /** * {@inheritDoc} */ @Override public void addConnectionEventListener(final ConnectionEventListener listener) { Validator.ensureNotNull(listener); listeners.add(listener); final boolean notifyClose; final boolean notifyErrorOccurred; synchronized (stateLock) { notifyClose = isClosed; notifyErrorOccurred = isFailed; if (!isClosed) { if (listeners == null) { listeners = new LinkedList<ConnectionEventListener>(); } listeners.add(listener); } } if (notifyErrorOccurred) { // Use the reason provided in the disconnect notification. listener.handleConnectionError(failedDueToDisconnect, newErrorResult(connectionInvalidReason)); } if (notifyClose) { listener.handleConnectionClosed(); } } /** * {@inheritDoc} */ @Override public FutureResult<BindResult> bindAsync(final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super BindResult> resultHandler) { final int messageID = nextMsgID.getAndIncrement(); BindClient context; final BindClient context; try { context = request.createBindClient( @@ -253,49 +248,41 @@ new LDAPBindFutureResultImpl(messageID, context, resultHandler, intermediateResponseHandler, this); synchronized (stateLock) { if (connectionInvalidReason != null) { future.adaptErrorResult(connectionInvalidReason); return future; } if (!pendingRequests.isEmpty()) { future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR) .setDiagnosticMessage( "There are other operations pending on this connection")); return future; } if (!bindOrStartTLSInProgress.compareAndSet(false, true)) { future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR) .setDiagnosticMessage("Bind or Start TLS operation in progress")); return future; } pendingRequests.put(messageID, future); } try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { // Use the bind client to get the initial request instead of // using the bind request passed to this method. final GenericBindRequest initialRequest = context.nextBindRequest(); ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); synchronized (stateLock) { checkConnectionIsValid(); if (!pendingRequests.isEmpty()) { future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR) .setDiagnosticMessage( "There are other operations pending on this connection")); return future; } if (!bindOrStartTLSInProgress.compareAndSet(false, true)) { future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR) .setDiagnosticMessage("Bind or Start TLS operation in progress")); return future; } pendingRequests.put(messageID, future); } } catch (final IOException e) { pendingRequests.remove(messageID); bindOrStartTLSInProgress.set(false); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.adaptErrorResult(errorResult); try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { // Use the bind client to get the initial request instead of // using the bind request passed to this method. final GenericBindRequest initialRequest = context.nextBindRequest(); ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); } } catch (final IOException e) { pendingRequests.remove(messageID); bindOrStartTLSInProgress.set(false); throw adaptRequestIOException(e); } } catch (final ErrorResultException e) { future.adaptErrorResult(e.getResult()); } return future; @@ -304,6 +291,7 @@ /** * {@inheritDoc} */ @Override public void close(final UnbindRequest request, final String reason) { // FIXME: I18N need to internationalize this message. Validator.ensureNotNull(request); @@ -316,6 +304,7 @@ /** * {@inheritDoc} */ @Override public FutureResult<CompareResult> compareAsync(final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super CompareResult> resultHandler) { @@ -323,45 +312,34 @@ final LDAPCompareFutureResultImpl future = new LDAPCompareFutureResultImpl(messageID, request, resultHandler, intermediateResponseHandler, this); synchronized (stateLock) { if (connectionInvalidReason != null) { future.adaptErrorResult(connectionInvalidReason); return future; } if (bindOrStartTLSInProgress.get()) { future.setResultOrError(Responses.newCompareResult(ResultCode.OPERATIONS_ERROR) .setDiagnosticMessage("Bind or Start TLS operation in progress")); return future; } pendingRequests.put(messageID, future); } try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.compareRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); synchronized (stateLock) { checkConnectionIsValid(); checkBindOrStartTLSInProgress(); pendingRequests.put(messageID, future); } } catch (final IOException e) { pendingRequests.remove(messageID); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.adaptErrorResult(errorResult); try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.compareRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); } } catch (final IOException e) { pendingRequests.remove(messageID); throw adaptRequestIOException(e); } } catch (final ErrorResultException e) { future.adaptErrorResult(e.getResult()); } return future; } /** * {@inheritDoc} */ @Override public FutureResult<Result> deleteAsync(final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super Result> resultHandler) { @@ -369,45 +347,34 @@ final LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request, resultHandler, intermediateResponseHandler, this); synchronized (stateLock) { if (connectionInvalidReason != null) { future.adaptErrorResult(connectionInvalidReason); return future; } if (bindOrStartTLSInProgress.get()) { future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR) .setDiagnosticMessage("Bind or Start TLS operation in progress")); return future; } pendingRequests.put(messageID, future); } try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.deleteRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); synchronized (stateLock) { checkConnectionIsValid(); checkBindOrStartTLSInProgress(); pendingRequests.put(messageID, future); } } catch (final IOException e) { pendingRequests.remove(messageID); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.adaptErrorResult(errorResult); try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.deleteRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); } } catch (final IOException e) { pendingRequests.remove(messageID); throw adaptRequestIOException(e); } } catch (final ErrorResultException e) { future.adaptErrorResult(e.getResult()); } return future; } /** * {@inheritDoc} */ @Override public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync( final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler, @@ -416,68 +383,54 @@ final LDAPExtendedFutureResultImpl<R> future = new LDAPExtendedFutureResultImpl<R>(messageID, request, resultHandler, intermediateResponseHandler, this); synchronized (stateLock) { if (connectionInvalidReason != null) { future.adaptErrorResult(connectionInvalidReason); return future; } if (request.getOID().equals(StartTLSExtendedRequest.OID)) { if (!pendingRequests.isEmpty()) { future.setResultOrError(request.getResultDecoder().newExtendedErrorResult( ResultCode.OPERATIONS_ERROR, "", "There are pending operations on this connection")); return future; } if (isTLSEnabled()) { future.setResultOrError(request.getResultDecoder().newExtendedErrorResult( ResultCode.OPERATIONS_ERROR, "", "This connection is already TLS enabled")); return future; } if (!bindOrStartTLSInProgress.compareAndSet(false, true)) { future.setResultOrError(request.getResultDecoder().newExtendedErrorResult( ResultCode.OPERATIONS_ERROR, "", "Bind or Start TLS operation in progress")); return future; } } else { if (bindOrStartTLSInProgress.get()) { future.setResultOrError(request.getResultDecoder().newExtendedErrorResult( ResultCode.OPERATIONS_ERROR, "", "Bind or Start TLS operation in progress")); return future; } } pendingRequests.put(messageID, future); } try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.extendedRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); synchronized (stateLock) { checkConnectionIsValid(); if (request.getOID().equals(StartTLSExtendedRequest.OID)) { if (!pendingRequests.isEmpty()) { future.setResultOrError(request.getResultDecoder().newExtendedErrorResult( ResultCode.OPERATIONS_ERROR, "", "There are pending operations on this connection")); return future; } else if (isTLSEnabled()) { future.setResultOrError(request.getResultDecoder().newExtendedErrorResult( ResultCode.OPERATIONS_ERROR, "", "This connection is already TLS enabled")); return future; } else if (!bindOrStartTLSInProgress.compareAndSet(false, true)) { future.setResultOrError(request.getResultDecoder().newExtendedErrorResult( ResultCode.OPERATIONS_ERROR, "", "Bind or Start TLS operation in progress")); return future; } } else { checkBindOrStartTLSInProgress(); } pendingRequests.put(messageID, future); } } catch (final IOException e) { pendingRequests.remove(messageID); bindOrStartTLSInProgress.set(false); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.adaptErrorResult(errorResult); try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.extendedRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); } } catch (final IOException e) { pendingRequests.remove(messageID); bindOrStartTLSInProgress.set(false); throw adaptRequestIOException(e); } } catch (final ErrorResultException e) { future.adaptErrorResult(e.getResult()); } return future; } /** * {@inheritDoc} */ @Override public boolean isClosed() { synchronized (stateLock) { return isClosed; @@ -487,15 +440,17 @@ /** * {@inheritDoc} */ @Override public boolean isValid() { synchronized (stateLock) { return connectionInvalidReason == null && !isClosed; return isValid0(); } } /** * {@inheritDoc} */ @Override public FutureResult<Result> modifyAsync(final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super Result> resultHandler) { @@ -503,45 +458,34 @@ final LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request, resultHandler, intermediateResponseHandler, this); synchronized (stateLock) { if (connectionInvalidReason != null) { future.adaptErrorResult(connectionInvalidReason); return future; } if (bindOrStartTLSInProgress.get()) { future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR) .setDiagnosticMessage("Bind or Start TLS operation in progress")); return future; } pendingRequests.put(messageID, future); } try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.modifyRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); synchronized (stateLock) { checkConnectionIsValid(); checkBindOrStartTLSInProgress(); pendingRequests.put(messageID, future); } } catch (final IOException e) { pendingRequests.remove(messageID); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.adaptErrorResult(errorResult); try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.modifyRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); } } catch (final IOException e) { pendingRequests.remove(messageID); throw adaptRequestIOException(e); } } catch (final ErrorResultException e) { future.adaptErrorResult(e.getResult()); } return future; } /** * {@inheritDoc} */ @Override public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super Result> resultHandler) { @@ -549,53 +493,47 @@ final LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request, resultHandler, intermediateResponseHandler, this); synchronized (stateLock) { if (connectionInvalidReason != null) { future.adaptErrorResult(connectionInvalidReason); return future; } if (bindOrStartTLSInProgress.get()) { future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR) .setDiagnosticMessage("Bind or Start TLS operation in progress")); return future; } pendingRequests.put(messageID, future); } try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.modifyDNRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); synchronized (stateLock) { checkConnectionIsValid(); checkBindOrStartTLSInProgress(); pendingRequests.put(messageID, future); } } catch (final IOException e) { pendingRequests.remove(messageID); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.adaptErrorResult(errorResult); try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.modifyDNRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); } } catch (final IOException e) { pendingRequests.remove(messageID); throw adaptRequestIOException(e); } } catch (final ErrorResultException e) { future.adaptErrorResult(e.getResult()); } return future; } /** * {@inheritDoc} */ @Override public void removeConnectionEventListener(final ConnectionEventListener listener) { Validator.ensureNotNull(listener); listeners.remove(listener); synchronized (stateLock) { if (listeners != null) { listeners.remove(listener); } } } /** * {@inheritDoc} */ @Override public FutureResult<Result> searchAsync(final SearchRequest request, final IntermediateResponseHandler intermediateResponseHandler, final SearchResultHandler resultHandler) { @@ -603,47 +541,36 @@ final LDAPSearchFutureResultImpl future = new LDAPSearchFutureResultImpl(messageID, request, resultHandler, intermediateResponseHandler, this); synchronized (stateLock) { if (connectionInvalidReason != null) { future.adaptErrorResult(connectionInvalidReason); return future; } if (bindOrStartTLSInProgress.get()) { future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR) .setDiagnosticMessage("Bind or Start TLS operation in progress")); return future; } pendingRequests.put(messageID, future); } try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.searchRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); synchronized (stateLock) { checkConnectionIsValid(); checkBindOrStartTLSInProgress(); pendingRequests.put(messageID, future); } } catch (final IOException e) { pendingRequests.remove(messageID); // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); future.adaptErrorResult(errorResult); try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { ldapWriter.searchRequest(asn1Writer, messageID, request); connection.write(asn1Writer.getBuffer(), null); } finally { asn1Writer.recycle(); } } catch (final IOException e) { pendingRequests.remove(messageID); throw adaptRequestIOException(e); } } catch (final ErrorResultException e) { future.adaptErrorResult(e.getResult()); } return future; } /** * {@inheritDoc} */ @Override public String toString() { StringBuilder builder = new StringBuilder(); final StringBuilder builder = new StringBuilder(); builder.append("LDAPConnection("); builder.append(connection.getLocalAddress()); builder.append(','); @@ -652,23 +579,11 @@ return builder.toString(); } int continuePendingBindRequest(final LDAPBindFutureResultImpl future) throws ErrorResultException { final int newMsgID = nextMsgID.getAndIncrement(); synchronized (stateLock) { if (connectionInvalidReason != null) { throw newErrorResult(connectionInvalidReason); } pendingRequests.put(newMsgID, future); } return newMsgID; } long cancelExpiredRequests(final long currentTime) { final long timeout = options.getTimeout(TimeUnit.MILLISECONDS); long delay = timeout; if (timeout > 0) { for (int requestID : pendingRequests.keySet()) { for (final int requestID : pendingRequests.keySet()) { final AbstractLDAPFutureResultImpl<?> future = pendingRequests.get(requestID); if (future != null) { final long diff = (future.getTimestamp() + timeout) - currentTime; @@ -687,52 +602,53 @@ return delay; } /** * Closes this connection, invoking event listeners as needed. * * @param unbindRequest * The client provided unbind request if this is a client * initiated close, or {@code null} if the connection has failed. * @param isDisconnectNotification * {@code true} if this is a connection failure signalled by a * server disconnect notification. * @param reason * The result indicating why the connection was closed. */ void close(final UnbindRequest unbindRequest, final boolean isDisconnectNotification, final Result reason) { boolean notifyClose = false; boolean notifyErrorOccurred = false; final boolean notifyClose; final boolean notifyErrorOccurred; final List<ConnectionEventListener> tmpListeners; synchronized (stateLock) { if (isClosed) { // Already closed. // Already closed locally. return; } if (connectionInvalidReason != null) { // Already closed. isClosed = true; return; } if (unbindRequest != null) { // User closed. isClosed = true; } else if (unbindRequest != null) { // Local close. notifyClose = true; notifyErrorOccurred = false; isClosed = true; tmpListeners = listeners; listeners = null; // Prevent future invocations. if (connectionInvalidReason == null) { connectionInvalidReason = reason; } } else if (isFailed) { // Already failed. return; } else { // Connection has failed and this is the first indication. notifyClose = false; notifyErrorOccurred = true; } // Mark the connection as invalid. if (!isDisconnectNotification) { // Connection termination was detected locally, so use the // provided // reason for all subsequent requests. isFailed = true; failedDueToDisconnect = isDisconnectNotification; connectionInvalidReason = reason; } else { // Connection termination was triggered remotely. We don't want // to blindly pass on the result code to requests since it could // be confused for a genuine response. For example, if the // disconnect contained the invalidCredentials result code then // this could be misinterpreted as a genuine authentication // failure for subsequent bind requests. connectionInvalidReason = Responses.newResult(ResultCode.CLIENT_SIDE_SERVER_DOWN) .setDiagnosticMessage("Connection closed by server"); tmpListeners = listeners; // Keep list for client close. } } // First abort all outstanding requests. for (int requestID : pendingRequests.keySet()) { for (final int requestID : pendingRequests.keySet()) { final AbstractLDAPFutureResultImpl<?> future = pendingRequests.remove(requestID); if (future != null) { future.adaptErrorResult(connectionInvalidReason); @@ -741,7 +657,7 @@ // Now try cleanly closing the connection if possible. // Only send unbind if specified. if (unbindRequest != null && !isDisconnectNotification) { if (unbindRequest != null) { try { final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { @@ -758,18 +674,29 @@ connection.closeSilently(); // Notify listeners. if (notifyClose) { for (final ConnectionEventListener listener : listeners) { listener.handleConnectionClosed(); if (tmpListeners != null) { if (notifyErrorOccurred) { for (final ConnectionEventListener listener : tmpListeners) { // Use the reason provided in the disconnect notification. listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason)); } } if (notifyClose) { for (final ConnectionEventListener listener : tmpListeners) { listener.handleConnectionClosed(); } } } } if (notifyErrorOccurred) { for (final ConnectionEventListener listener : listeners) { // Use the reason provided in the disconnect notification. listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason)); } int continuePendingBindRequest(final LDAPBindFutureResultImpl future) throws ErrorResultException { final int newMsgID = nextMsgID.getAndIncrement(); synchronized (stateLock) { checkConnectionIsValid(); pendingRequests.put(newMsgID, future); } return newMsgID; } LDAPOptions getLDAPOptions() { @@ -781,13 +708,14 @@ } void handleUnsolicitedNotification(final ExtendedResult result) { if (isClosed()) { // Don't notify after connection is closed. return; final List<ConnectionEventListener> tmpListeners; synchronized (stateLock) { tmpListeners = listeners; } for (final ConnectionEventListener listener : listeners) { listener.handleUnsolicitedNotification(result); if (tmpListeners != null) { for (final ConnectionEventListener listener : tmpListeners) { listener.handleUnsolicitedNotification(result); } } } @@ -801,7 +729,7 @@ void installFilter(final Filter filter) { synchronized (stateLock) { // Determine the index where the filter should be added. FilterChain oldFilterChain = (FilterChain) connection.getProcessor(); final FilterChain oldFilterChain = (FilterChain) connection.getProcessor(); int filterIndex = oldFilterChain.size() - 1; if (filter instanceof SSLFilter) { // Beneath any ConnectionSecurityLayerFilters if present, @@ -815,7 +743,7 @@ } // Create the new filter chain. FilterChain newFilterChain = final FilterChain newFilterChain = FilterChainBuilder.stateless().addAll(oldFilterChain).add(filterIndex, filter) .build(); connection.setProcessor(newFilterChain); @@ -831,7 +759,7 @@ boolean isTLSEnabled() { synchronized (stateLock) { final FilterChain currentFilterChain = (FilterChain) connection.getProcessor(); for (Filter filter : currentFilterChain) { for (final Filter filter : currentFilterChain) { if (filter instanceof SSLFilter) { return true; } @@ -856,19 +784,56 @@ throw new IllegalStateException("TLS already enabled"); } SSLEngineConfigurator sslEngineConfigurator = final SSLEngineConfigurator sslEngineConfigurator = new SSLEngineConfigurator(sslContext, true, false, false); sslEngineConfigurator.setEnabledProtocols(protocols.isEmpty() ? null : protocols .toArray(new String[protocols.size()])); sslEngineConfigurator.setEnabledCipherSuites(cipherSuites.isEmpty() ? null : cipherSuites.toArray(new String[cipherSuites.size()])); SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator); final SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator); installFilter(sslFilter); sslFilter.handshake(connection, completionHandler); } } private ErrorResultException adaptRequestIOException(final IOException e) { // FIXME: what other sort of IOExceptions can be thrown? // FIXME: Is this the best result code? final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e); connectionErrorOccurred(errorResult); return newErrorResult(errorResult); } private void checkBindOrStartTLSInProgress() throws ErrorResultException { if (bindOrStartTLSInProgress.get()) { throw newErrorResult(ResultCode.OPERATIONS_ERROR, "Bind or Start TLS operation in progress"); } } private void checkConnectionIsValid() throws ErrorResultException { if (!isValid0()) { if (failedDueToDisconnect) { // Connection termination was triggered remotely. We don't want // to blindly pass on the result code to requests since it could // be confused for a genuine response. For example, if the // disconnect contained the invalidCredentials result code then // this could be misinterpreted as a genuine authentication // failure for subsequent bind requests. throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, "Connection closed by server"); } else { throw newErrorResult(connectionInvalidReason); } } } private void connectionErrorOccurred(final Result reason) { close(null, false, reason); } private boolean isValid0() { return !isFailed && !isClosed; } } opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
@@ -33,6 +33,7 @@ import java.net.InetSocketAddress; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -89,12 +90,12 @@ final class LDAPServerFilter extends BaseFilter { private abstract class AbstractHandler<R extends Result> implements IntermediateResponseHandler, ResultHandler<R> { protected final ClientContextImpl context; protected final int messageID; protected final Connection<?> connection; protected AbstractHandler(final int messageID, final Connection<?> connection) { protected AbstractHandler(final ClientContextImpl context, final int messageID) { this.messageID = messageID; this.connection = connection; this.context = context; } @Override @@ -102,9 +103,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.intermediateResponse(asn1Writer, messageID, response); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); return false; } finally { asn1Writer.recycle(); @@ -114,8 +115,8 @@ } private final class AddHandler extends AbstractHandler<Result> { private AddHandler(final int messageID, final Connection<?> connection) { super(messageID, connection); private AddHandler(final ClientContextImpl context, final int messageID) { super(context, messageID); } @Override @@ -128,9 +129,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.addResult(asn1Writer, messageID, result); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); } finally { asn1Writer.recycle(); } @@ -138,8 +139,8 @@ } private final class BindHandler extends AbstractHandler<BindResult> { private BindHandler(final int messageID, final Connection<?> connection) { super(messageID, connection); private BindHandler(final ClientContextImpl context, final int messageID) { super(context, messageID); } @Override @@ -164,9 +165,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.bindResult(asn1Writer, messageID, result); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); } finally { asn1Writer.recycle(); } @@ -175,9 +176,7 @@ private final class ClientContextImpl implements LDAPClientContext { private final Connection<?> connection; private volatile boolean isClosed = false; private final AtomicBoolean isClosed = new AtomicBoolean(); private ServerConnection<Integer> serverConnection = null; private ClientContextImpl(final Connection<?> connection) { @@ -186,18 +185,44 @@ @Override public void disconnect() { LDAPServerFilter.notifyConnectionDisconnected(connection, null, null); disconnect0(null, null); } @Override public void disconnect(final ResultCode resultCode, final String message) { Validator.ensureNotNull(resultCode); final GenericExtendedResult notification = Responses.newGenericExtendedResult(resultCode).setOID( OID_NOTICE_OF_DISCONNECTION).setDiagnosticMessage(message); sendUnsolicitedNotification(notification); LDAPServerFilter.notifyConnectionDisconnected(connection, resultCode, message); disconnect0(resultCode, message); } @Override public void enableConnectionSecurityLayer(final ConnectionSecurityLayer layer) { synchronized (this) { installFilter(new ConnectionSecurityLayerFilter(layer, connection.getTransport() .getMemoryManager())); } } @Override public void enableTLS(final SSLContext sslContext, final String[] protocols, final String[] suites, final boolean wantClientAuth, final boolean needClientAuth) { Validator.ensureNotNull(sslContext); synchronized (this) { if (isTLSEnabled()) { throw new IllegalStateException("TLS already enabled"); } final SSLEngineConfigurator sslEngineConfigurator = new SSLEngineConfigurator(sslContext, false, false, false); sslEngineConfigurator.setEnabledCipherSuites(suites); sslEngineConfigurator.setEnabledProtocols(protocols); sslEngineConfigurator.setWantClientAuth(wantClientAuth); sslEngineConfigurator.setNeedClientAuth(needClientAuth); installFilter(new SSLFilter(sslEngineConfigurator, null)); } } @Override @@ -232,7 +257,7 @@ */ @Override public boolean isClosed() { return isClosed; return isClosed.get(); } @Override @@ -242,44 +267,18 @@ LDAP_WRITER.extendedResult(asn1Writer, 0, notification); connection.write(asn1Writer.getBuffer(), null); } catch (final IOException ioe) { LDAPServerFilter.notifyConnectionException(connection, ioe); handleError(ioe); } finally { asn1Writer.recycle(); } } @Override public void enableConnectionSecurityLayer(final ConnectionSecurityLayer layer) { synchronized (this) { installFilter(new ConnectionSecurityLayerFilter(layer, connection.getTransport() .getMemoryManager())); } } @Override public void enableTLS(final SSLContext sslContext, final String[] protocols, final String[] suites, final boolean wantClientAuth, final boolean needClientAuth) { Validator.ensureNotNull(sslContext); synchronized (this) { if (isTLSEnabled()) { throw new IllegalStateException("TLS already enabled"); } SSLEngineConfigurator sslEngineConfigurator = new SSLEngineConfigurator(sslContext, false, false, false); sslEngineConfigurator.setEnabledCipherSuites(suites); sslEngineConfigurator.setEnabledProtocols(protocols); sslEngineConfigurator.setWantClientAuth(wantClientAuth); sslEngineConfigurator.setNeedClientAuth(needClientAuth); installFilter(new SSLFilter(sslEngineConfigurator, null)); } } /** * {@inheritDoc} */ @Override public String toString() { StringBuilder builder = new StringBuilder(); final StringBuilder builder = new StringBuilder(); builder.append("LDAPClientContext("); builder.append(getLocalAddress()); builder.append(','); @@ -288,16 +287,67 @@ return builder.toString(); } private void close() { isClosed = true; public void write(final ASN1BufferWriter asn1Writer) { connection.write(asn1Writer.getBuffer(), null); } private void disconnect0(final ResultCode resultCode, final String message) { // Close this connection context. if (isClosed.compareAndSet(false, true)) { try { // Notify the server connection: it may be null if disconnect is // invoked during accept. if (serverConnection != null) { serverConnection.handleConnectionDisconnected(resultCode, message); } } finally { // Close the connection. connection.closeSilently(); } } } private ServerConnection<Integer> getServerConnection() { return serverConnection; } private void setServerConnection(final ServerConnection<Integer> serverConnection) { this.serverConnection = serverConnection; private void handleClose(final int messageID, final UnbindRequest unbindRequest) { // Close this connection context. if (isClosed.compareAndSet(false, true)) { try { // Notify the server connection: it may be null if disconnect is // invoked during accept. if (serverConnection != null) { serverConnection.handleConnectionClosed(messageID, unbindRequest); } } finally { // If this close was a result of an unbind request then the // connection won't actually be closed yet. To avoid TIME_WAIT TCP // state, let the client disconnect. if (unbindRequest != null) { return; } // Close the connection. connection.closeSilently(); } } } private void handleError(final Throwable error) { // Close this connection context. if (isClosed.compareAndSet(false, true)) { try { // Notify the server connection: it may be null if disconnect is // invoked during accept. if (serverConnection != null) { serverConnection.handleConnectionError(error); } } finally { // Close the connection. connection.closeSilently(); } } } /** @@ -313,8 +363,7 @@ int filterIndex = oldFilterChain.size() - 1; if (filter instanceof SSLFilter) { // Beneath any ConnectionSecurityLayerFilters if present, // otherwise // beneath the LDAP filter. // otherwise beneath the LDAP filter. for (int i = oldFilterChain.size() - 2; i >= 0; i--) { if (!(oldFilterChain.get(i) instanceof ConnectionSecurityLayerFilter)) { filterIndex = i + 1; @@ -339,7 +388,7 @@ private boolean isTLSEnabled() { synchronized (this) { final FilterChain currentFilterChain = (FilterChain) connection.getProcessor(); for (Filter filter : currentFilterChain) { for (final Filter filter : currentFilterChain) { if (filter instanceof SSLFilter) { return true; } @@ -348,11 +397,14 @@ } } private void setServerConnection(final ServerConnection<Integer> serverConnection) { this.serverConnection = serverConnection; } } private final class CompareHandler extends AbstractHandler<CompareResult> { private CompareHandler(final int messageID, final Connection<?> connection) { super(messageID, connection); private CompareHandler(final ClientContextImpl context, final int messageID) { super(context, messageID); } @Override @@ -377,9 +429,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.compareResult(asn1Writer, messageID, result); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); } finally { asn1Writer.recycle(); } @@ -387,8 +439,8 @@ } private final class DeleteHandler extends AbstractHandler<Result> { private DeleteHandler(final int messageID, final Connection<?> connection) { super(messageID, connection); private DeleteHandler(final ClientContextImpl context, final int messageID) { super(context, messageID); } @Override @@ -401,9 +453,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.deleteResult(asn1Writer, messageID, result); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); } finally { asn1Writer.recycle(); } @@ -411,8 +463,8 @@ } private final class ExtendedHandler<R extends ExtendedResult> extends AbstractHandler<R> { private ExtendedHandler(final int messageID, final Connection<?> connection) { super(messageID, connection); private ExtendedHandler(final ClientContextImpl context, final int messageID) { super(context, messageID); } @Override @@ -438,9 +490,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.extendedResult(asn1Writer, messageID, result); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); } finally { asn1Writer.recycle(); } @@ -448,8 +500,8 @@ } private final class ModifyDNHandler extends AbstractHandler<Result> { private ModifyDNHandler(final int messageID, final Connection<?> connection) { super(messageID, connection); private ModifyDNHandler(final ClientContextImpl context, final int messageID) { super(context, messageID); } @Override @@ -462,9 +514,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.modifyDNResult(asn1Writer, messageID, result); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); } finally { asn1Writer.recycle(); } @@ -472,8 +524,8 @@ } private final class ModifyHandler extends AbstractHandler<Result> { private ModifyHandler(final int messageID, final Connection<?> connection) { super(messageID, connection); private ModifyHandler(final ClientContextImpl context, final int messageID) { super(context, messageID); } @Override @@ -486,9 +538,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.modifyResult(asn1Writer, messageID, result); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); } finally { asn1Writer.recycle(); } @@ -497,8 +549,8 @@ private final class SearchHandler extends AbstractHandler<Result> implements SearchResultHandler { private SearchHandler(final int messageID, final Connection<?> connection) { super(messageID, connection); private SearchHandler(final ClientContextImpl context, final int messageID) { super(context, messageID); } @Override @@ -506,9 +558,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.searchResultEntry(asn1Writer, messageID, entry); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); return false; } finally { asn1Writer.recycle(); @@ -526,9 +578,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.searchResultReference(asn1Writer, messageID, reference); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); return false; } finally { asn1Writer.recycle(); @@ -541,9 +593,9 @@ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter(); try { LDAP_WRITER.searchResult(asn1Writer, messageID, result); connection.write(asn1Writer.getBuffer(), null); context.write(asn1Writer); } catch (final IOException ioe) { notifyConnectionException(connection, ioe); context.handleError(ioe); } finally { asn1Writer.recycle(); } @@ -554,6 +606,14 @@ // following RFCs: 5289, 4346, 3268,4132 and 4162. private static final Map<String, Integer> CIPHER_KEY_SIZES; private static final Attribute<ASN1BufferReader> LDAP_ASN1_READER_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPASN1Reader"); private static final Attribute<ClientContextImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPServerConnection"); private static final LDAPWriter LDAP_WRITER = new LDAPWriter(); static { CIPHER_KEY_SIZES = new LinkedHashMap<String, Integer>(); CIPHER_KEY_SIZES.put("_WITH_AES_256_CBC_", 256); @@ -572,78 +632,10 @@ CIPHER_KEY_SIZES.put("_WITH_NULL_", 0); } private static final LDAPWriter LDAP_WRITER = new LDAPWriter(); private final LDAPReader ldapReader; private static final Attribute<ClientContextImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPServerConnection"); private static final Attribute<ASN1BufferReader> LDAP_ASN1_READER_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPASN1Reader"); private static void notifyConnectionClosed(final Connection<?> connection, final int messageID, final UnbindRequest unbindRequest) { final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(connection); if (clientContext != null) { // Close the connection context. clientContext.close(); // Notify the server connection: it may be null if disconnect is // invoked during accept. final ServerConnection<Integer> serverConnection = clientContext.getServerConnection(); if (serverConnection != null) { serverConnection.handleConnectionClosed(messageID, unbindRequest); } // If this close was a result of an unbind request then the // connection won't actually be closed yet. To avoid TIME_WAIT TCP // state, let the client disconnect. if (unbindRequest != null) { return; } // Close the connection. connection.closeSilently(); } } private static void notifyConnectionDisconnected(final Connection<?> connection, final ResultCode resultCode, final String message) { final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(connection); if (clientContext != null) { // Close the connection context. clientContext.close(); // Notify the server connection: it may be null if disconnect is // invoked during accept. final ServerConnection<Integer> serverConnection = clientContext.getServerConnection(); if (serverConnection != null) { serverConnection.handleConnectionDisconnected(resultCode, message); } // Close the connection. connection.closeSilently(); } } private static void notifyConnectionException(final Connection<?> connection, final Throwable error) { final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(connection); if (clientContext != null) { // Close the connection context. clientContext.close(); // Notify the server connection: it may be null if disconnect is // invoked during accept. final ServerConnection<Integer> serverConnection = clientContext.getServerConnection(); if (serverConnection != null) { serverConnection.handleConnectionError(error); } // Close the connection. connection.closeSilently(); } } private final LDAPListenerImpl listener; private final int maxASN1ElementSize; private final AbstractLDAPMessageHandler<FilterChainContext> serverRequestHandler = new AbstractLDAPMessageHandler<FilterChainContext>() { @Override @@ -664,7 +656,7 @@ LDAP_CONNECTION_ATTR.get(ctx.getConnection()); if (clientContext != null) { final ServerConnection<Integer> conn = clientContext.getServerConnection(); final AddHandler handler = new AddHandler(messageID, ctx.getConnection()); final AddHandler handler = new AddHandler(clientContext, messageID); conn.handleAdd(messageID, request, handler, handler); } } @@ -677,7 +669,7 @@ LDAP_CONNECTION_ATTR.get(ctx.getConnection()); if (clientContext != null) { final ServerConnection<Integer> conn = clientContext.getServerConnection(); final BindHandler handler = new BindHandler(messageID, ctx.getConnection()); final BindHandler handler = new BindHandler(clientContext, messageID); conn.handleBind(messageID, version, bindContext, handler, handler); } } @@ -689,8 +681,7 @@ LDAP_CONNECTION_ATTR.get(ctx.getConnection()); if (clientContext != null) { final ServerConnection<Integer> conn = clientContext.getServerConnection(); final CompareHandler handler = new CompareHandler(messageID, ctx.getConnection()); final CompareHandler handler = new CompareHandler(clientContext, messageID); conn.handleCompare(messageID, request, handler, handler); } } @@ -702,8 +693,7 @@ LDAP_CONNECTION_ATTR.get(ctx.getConnection()); if (clientContext != null) { final ServerConnection<Integer> conn = clientContext.getServerConnection(); final DeleteHandler handler = new DeleteHandler(messageID, ctx.getConnection()); final DeleteHandler handler = new DeleteHandler(clientContext, messageID); conn.handleDelete(messageID, request, handler, handler); } } @@ -717,7 +707,7 @@ if (clientContext != null) { final ServerConnection<Integer> conn = clientContext.getServerConnection(); final ExtendedHandler<R> handler = new ExtendedHandler<R>(messageID, ctx.getConnection()); new ExtendedHandler<R>(clientContext, messageID); conn.handleExtendedRequest(messageID, request, handler, handler); } } @@ -730,7 +720,7 @@ if (clientContext != null) { final ServerConnection<Integer> conn = clientContext.getServerConnection(); final ModifyDNHandler handler = new ModifyDNHandler(messageID, ctx.getConnection()); new ModifyDNHandler(clientContext, messageID); conn.handleModifyDN(messageID, request, handler, handler); } } @@ -742,8 +732,7 @@ LDAP_CONNECTION_ATTR.get(ctx.getConnection()); if (clientContext != null) { final ServerConnection<Integer> conn = clientContext.getServerConnection(); final ModifyHandler handler = new ModifyHandler(messageID, ctx.getConnection()); final ModifyHandler handler = new ModifyHandler(clientContext, messageID); conn.handleModify(messageID, request, handler, handler); } } @@ -755,8 +744,7 @@ LDAP_CONNECTION_ATTR.get(ctx.getConnection()); if (clientContext != null) { final ServerConnection<Integer> conn = clientContext.getServerConnection(); final SearchHandler handler = new SearchHandler(messageID, ctx.getConnection()); final SearchHandler handler = new SearchHandler(clientContext, messageID); conn.handleSearch(messageID, request, handler, handler); } } @@ -764,23 +752,23 @@ @Override public void unbindRequest(final FilterChainContext ctx, final int messageID, final UnbindRequest request) { notifyConnectionClosed(ctx.getConnection(), messageID, request); // Remove the client context causing any subsequent LDAP // traffic to be ignored. final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection()); if (clientContext != null) { clientContext.handleClose(messageID, request); } } @Override public void unrecognizedMessage(final FilterChainContext ctx, final int messageID, final byte messageTag, final ByteString messageBytes) { notifyConnectionException(ctx.getConnection(), new UnsupportedMessageException( messageID, messageTag, messageBytes)); exceptionOccurred(ctx, new UnsupportedMessageException(messageID, messageTag, messageBytes)); } }; private final int maxASN1ElementSize; private final LDAPReader ldapReader; private final LDAPListenerImpl listener; LDAPServerFilter(final LDAPListenerImpl listener, final LDAPReader ldapReader, final int maxASN1ElementSize) { this.listener = listener; @@ -790,7 +778,10 @@ @Override public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) { notifyConnectionException(ctx.getConnection(), error); final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection()); if (clientContext != null) { clientContext.handleError(error); } } @Override @@ -812,7 +803,10 @@ @Override public NextAction handleClose(final FilterChainContext ctx) throws IOException { notifyConnectionClosed(ctx.getConnection(), -1, null); final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection()); if (clientContext != null) { clientContext.handleClose(-1, null); } return ctx.getStopAction(); } opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -22,13 +22,15 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS * Portions copyright 2011-2012 ForgeRock AS */ package org.forgerock.opendj.ldap; import static org.fest.assertions.Assertions.assertThat; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,16 +39,22 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import javax.net.ssl.SSLContext; import org.forgerock.opendj.ldap.requests.BindRequest; import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest; import org.forgerock.opendj.ldap.requests.Requests; import org.forgerock.opendj.ldap.requests.SearchRequest; import org.forgerock.opendj.ldap.responses.BindResult; import org.forgerock.opendj.ldap.responses.Responses; import org.forgerock.opendj.ldap.responses.SearchResultEntry; import org.forgerock.opendj.ldap.schema.Schema; import org.forgerock.opendj.ldap.schema.SchemaBuilder; @@ -65,6 +73,10 @@ */ @SuppressWarnings("javadoc") public class ConnectionFactoryTestCase extends SdkTestCase { // Test timeout in ms for tests which need to wait for network events. private static final long TEST_TIMEOUT = 30L; private static final long TEST_TIMEOUT_MS = TEST_TIMEOUT * 1000L; class MyResultHandler implements ResultHandler<Connection> { // latch. private final CountDownLatch latch; @@ -114,8 +126,8 @@ StaticUtils.DEBUG_LOG.setLevel(Level.INFO); } @DataProvider(name = "connectionFactories") public Object[][] getConnectionFactories() throws Exception { @DataProvider Object[][] connectionFactories() throws Exception { Object[][] factories = new Object[21][1]; // HeartBeatConnectionFactory @@ -429,4 +441,228 @@ assertThat(realConnectionIsClosed[2]).isTrue(); assertThat(realConnectionIsClosed[3]).isTrue(); } private static final class CloseNotify { private boolean closeOnAccept; private boolean doBindFirst; private boolean useEventListener; private boolean sendDisconnectNotification; private CloseNotify(boolean closeOnAccept, boolean doBindFirst, boolean useEventListener, boolean sendDisconnectNotification) { this.closeOnAccept = closeOnAccept; this.doBindFirst = doBindFirst; this.useEventListener = useEventListener; this.sendDisconnectNotification = sendDisconnectNotification; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("["); if (closeOnAccept) { builder.append(" closeOnAccept"); } if (doBindFirst) { builder.append(" doBindFirst"); } if (useEventListener) { builder.append(" useEventListener"); } if (sendDisconnectNotification) { builder.append(" sendDisconnectNotification"); } builder.append(" ]"); return builder.toString(); } } @DataProvider Object[][] closeNotifyConfig() { // @formatter:off return new Object[][] { // closeOnAccept, doBindFirst, useEventListener, sendDisconnectNotification // Close on accept. { new CloseNotify(true, false, false, false) }, { new CloseNotify(true, false, true, false) }, // Use disconnect. { new CloseNotify(false, false, false, false) }, { new CloseNotify(false, false, false, true) }, { new CloseNotify(false, false, true, false) }, { new CloseNotify(false, false, true, true) }, { new CloseNotify(false, true, false, false) }, { new CloseNotify(false, true, false, true) }, { new CloseNotify(false, true, true, false) }, { new CloseNotify(false, true, true, true) }, }; // @formatter:on } @SuppressWarnings("unchecked") @Test(dataProvider = "closeNotifyConfig") public void testCloseNotify(final CloseNotify config) throws Exception { final CountDownLatch connectLatch = new CountDownLatch(1); final AtomicReference<LDAPClientContext> contextHolder = new AtomicReference<LDAPClientContext>(); final ServerConnectionFactory<LDAPClientContext, Integer> mockServer = mock(ServerConnectionFactory.class); when(mockServer.handleAccept(any(LDAPClientContext.class))).thenAnswer( new Answer<ServerConnection<Integer>>() { public ServerConnection<Integer> answer(InvocationOnMock invocation) throws Throwable { // Allow the context to be accessed from outside the mock. contextHolder.set((LDAPClientContext) invocation.getArguments()[0]); connectLatch.countDown(); /* is this needed? */ if (config.closeOnAccept) { throw newErrorResult(ResultCode.UNAVAILABLE); } else { // Return a mock connection which always succeeds for binds. ServerConnection<Integer> mockConnection = mock(ServerConnection.class); doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { ResultHandler<? super BindResult> resultHandler = (ResultHandler<? super BindResult>) invocation .getArguments()[4]; resultHandler.handleResult(Responses .newBindResult(ResultCode.SUCCESS)); return null; } }).when(mockConnection).handleBind(anyInt(), anyInt(), any(BindRequest.class), any(IntermediateResponseHandler.class), any(ResultHandler.class)); return mockConnection; } } }); final int port = TestCaseUtils.findFreePort(); LDAPListener listener = new LDAPListener(port, mockServer); try { LDAPConnectionFactory clientFactory = new LDAPConnectionFactory("localhost", port); final Connection client = clientFactory.getConnection(); connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS); MockConnectionEventListener mockListener = null; try { if (config.useEventListener) { mockListener = new MockConnectionEventListener(); client.addConnectionEventListener(mockListener); } if (config.doBindFirst) { client.bind("cn=test", "password".toCharArray()); } if (!config.closeOnAccept) { // Disconnect using client context. LDAPClientContext context = contextHolder.get(); assertThat(context).isNotNull(); assertThat(context.isClosed()).isFalse(); if (config.sendDisconnectNotification) { context.disconnect(ResultCode.BUSY, "busy"); } else { context.disconnect(); } assertThat(context.isClosed()).isTrue(); } // Block until remote close is signalled. if (mockListener != null) { // Block using listener. mockListener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS); assertThat(mockListener.getInvocationCount()).isEqualTo(1); assertThat(mockListener.isDisconnectNotification()).isEqualTo( config.sendDisconnectNotification); assertThat(mockListener.getError()).isNotNull(); } else { // Block by spinning on isValid. waitForCondition(new Callable<Boolean>() { @Override public Boolean call() throws Exception { return !client.isValid(); } }); } assertThat(client.isValid()).isFalse(); assertThat(client.isClosed()).isFalse(); } finally { client.close(); } // Check state after remote close and local close. assertThat(client.isValid()).isFalse(); assertThat(client.isClosed()).isTrue(); if (mockListener != null) { mockListener.awaitClose(TEST_TIMEOUT, TimeUnit.SECONDS); assertThat(mockListener.getInvocationCount()).isEqualTo(2); } } finally { listener.close(); } } @SuppressWarnings("unchecked") @Test public void testUnsolicitedNotifications() throws Exception { final CountDownLatch connectLatch = new CountDownLatch(1); final AtomicReference<LDAPClientContext> contextHolder = new AtomicReference<LDAPClientContext>(); final ServerConnectionFactory<LDAPClientContext, Integer> mockServer = mock(ServerConnectionFactory.class); when(mockServer.handleAccept(any(LDAPClientContext.class))).thenAnswer( new Answer<ServerConnection<Integer>>() { public ServerConnection<Integer> answer(InvocationOnMock invocation) throws Throwable { // Allow the context to be accessed from outside the mock. contextHolder.set((LDAPClientContext) invocation.getArguments()[0]); connectLatch.countDown(); /* is this needed? */ return mock(ServerConnection.class); } }); final int port = TestCaseUtils.findFreePort(); LDAPListener listener = new LDAPListener(port, mockServer); try { LDAPConnectionFactory clientFactory = new LDAPConnectionFactory("localhost", port); final Connection client = clientFactory.getConnection(); connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS); try { MockConnectionEventListener mockListener = new MockConnectionEventListener(); client.addConnectionEventListener(mockListener); // Send notification. LDAPClientContext context = contextHolder.get(); assertThat(context).isNotNull(); context.sendUnsolicitedNotification(Responses.newGenericExtendedResult( ResultCode.OTHER).setOID("1.2.3.4")); assertThat(context.isClosed()).isFalse(); // Block using listener. mockListener.awaitNotification(TEST_TIMEOUT, TimeUnit.SECONDS); assertThat(mockListener.getInvocationCount()).isEqualTo(1); assertThat(mockListener.getNotification()).isNotNull(); assertThat(mockListener.getNotification().getResultCode()).isEqualTo( ResultCode.OTHER); assertThat(mockListener.getNotification().getOID()).isEqualTo("1.2.3.4"); assertThat(client.isValid()).isTrue(); assertThat(client.isClosed()).isFalse(); } finally { client.close(); } } finally { listener.close(); } } private void waitForCondition(Callable<Boolean> condition) throws Exception { long timeout = System.currentTimeMillis() + TEST_TIMEOUT_MS; while (!condition.call()) { Thread.yield(); if (System.currentTimeMillis() > timeout) { throw new TimeoutException("Test timed out after " + TEST_TIMEOUT + " seconds"); } } } } opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
@@ -40,7 +40,6 @@ import org.forgerock.opendj.ldap.requests.ExtendedRequest; import org.forgerock.opendj.ldap.requests.ModifyDNRequest; import org.forgerock.opendj.ldap.requests.ModifyRequest; import org.forgerock.opendj.ldap.requests.Requests; import org.forgerock.opendj.ldap.requests.SearchRequest; import org.forgerock.opendj.ldap.requests.UnbindRequest; import org.forgerock.opendj.ldap.responses.BindResult; @@ -60,30 +59,6 @@ */ public class LDAPListenerTestCase extends SdkTestCase { private static class MockConnectionEventListener implements ConnectionEventListener { final CountDownLatch closeLatch = new CountDownLatch(1); String errorMessage = null; @Override public void handleConnectionClosed() { errorMessage = "Unexpected call to handleConnectionClosed"; closeLatch.countDown(); } @Override public void handleConnectionError(final boolean isDisconnectNotification, final ErrorResultException error) { errorMessage = "Unexpected call to handleConnectionError"; closeLatch.countDown(); } @Override public void handleUnsolicitedNotification(final ExtendedResult notification) { errorMessage = "Unexpected call to handleUnsolicitedNotification"; closeLatch.countDown(); } } private static class MockServerConnection implements ServerConnection<Integer> { volatile LDAPClientContext context = null; final CountDownLatch isConnected = new CountDownLatch(1); @@ -256,180 +231,6 @@ } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerClose() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory(onlineServerListener.getSocketAddress()) .getConnection(); final MockConnectionEventListener listener = new MockConnectionEventListener() { @Override public void handleConnectionClosed() { closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); connection.close(); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); } finally { onlineServerListener.close(); } } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test(enabled = false) public void testConnectionEventListenerDisconnect() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory(onlineServerListener.getSocketAddress()) .getConnection(); final MockConnectionEventListener listener = new MockConnectionEventListener() { @Override public void handleConnectionError(final boolean isDisconnectNotification, final ErrorResultException error) { if (isDisconnectNotification) { errorMessage = "Unexpected disconnect notification"; } closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); Assert.assertTrue(onlineServerConnection.isConnected.await(10, TimeUnit.SECONDS)); onlineServerConnection.context.disconnect(); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); connection.close(); } finally { onlineServerListener.close(); } } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerDisconnectNotification() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory(onlineServerListener.getSocketAddress()) .getConnection(); final MockConnectionEventListener listener = new MockConnectionEventListener() { @Override public void handleConnectionError(final boolean isDisconnectNotification, final ErrorResultException error) { if (!isDisconnectNotification || !error.getResult().getResultCode().equals(ResultCode.BUSY) || !error.getResult().getDiagnosticMessage().equals("test")) { errorMessage = "Missing disconnect notification: " + error; } closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); Assert.assertTrue(onlineServerConnection.isConnected.await(10, TimeUnit.SECONDS)); onlineServerConnection.context.disconnect(ResultCode.BUSY, "test"); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); connection.close(); } finally { onlineServerListener.close(); } } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerUnbind() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory(onlineServerListener.getSocketAddress()) .getConnection(); final MockConnectionEventListener listener = new MockConnectionEventListener() { @Override public void handleConnectionClosed() { closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); connection.close(Requests.newUnbindRequest(), "called from unit test"); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); } finally { onlineServerListener.close(); } } /** * Tests basic LDAP listener functionality. * * @throws Exception opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockConnectionEventListener.java
New file @@ -0,0 +1,112 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2012 ForgeRock AS. */ package org.forgerock.opendj.ldap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.forgerock.opendj.ldap.responses.ExtendedResult; /** * A connection event listener which records events and signals when it has been * notified. */ final class MockConnectionEventListener implements ConnectionEventListener { private final CountDownLatch closedLatch = new CountDownLatch(1); private final CountDownLatch errorLatch = new CountDownLatch(1); private final CountDownLatch notificationLatch = new CountDownLatch(1); private Boolean isDisconnectNotification = null; private ErrorResultException error = null; private ExtendedResult notification = null; private final AtomicInteger invocationCount = new AtomicInteger(); /** * {@inheritDoc} */ @Override public void handleConnectionClosed() { invocationCount.incrementAndGet(); closedLatch.countDown(); } /** * {@inheritDoc} */ @Override public void handleConnectionError(boolean isDisconnectNotification, ErrorResultException error) { this.isDisconnectNotification = isDisconnectNotification; this.error = error; invocationCount.incrementAndGet(); errorLatch.countDown(); } /** * {@inheritDoc} */ @Override public void handleUnsolicitedNotification(ExtendedResult notification) { this.notification = notification; invocationCount.incrementAndGet(); notificationLatch.countDown(); } void awaitClose(long timeout, TimeUnit unit) { await(closedLatch, timeout, unit); } void awaitError(long timeout, TimeUnit unit) { await(errorLatch, timeout, unit); } void awaitNotification(long timeout, TimeUnit unit) { await(notificationLatch, timeout, unit); } Boolean isDisconnectNotification() { return isDisconnectNotification; } ErrorResultException getError() { return error; } ExtendedResult getNotification() { return notification; } private void await(CountDownLatch latch, long timeout, TimeUnit unit) { try { latch.await(timeout, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } int getInvocationCount() { return invocationCount.get(); } }