| | |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.DN; |
| | | import org.forgerock.opendj.ldap.LDAPClientContext; |
| | | import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener; |
| | | import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener; |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | import org.forgerock.opendj.ldap.requests.UnbindRequest; |
| | | import org.forgerock.opendj.ldap.responses.CompareResult; |
| | | import org.forgerock.opendj.ldap.responses.Response; |
| | | import org.forgerock.opendj.ldap.responses.Responses; |
| | | import org.forgerock.opendj.ldap.responses.Result; |
| | | import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage; |
| | | import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope; |
| | | import org.opends.server.api.ClientConnection; |
| | | import org.opends.server.api.ConnectionHandler; |
| | | import org.opends.server.core.AbandonOperationBasis; |
| | |
| | | * instance of the LDAP connection handler and have its requests decoded by an LDAP request handler. |
| | | */ |
| | | public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection, |
| | | ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> { |
| | | ReactiveHandler<QueueingStrategy, LdapRequestEnvelope, Stream<Response>> { |
| | | private static final String REACTIVE_OUT = "reactive.out"; |
| | | |
| | | /** The tracer object for the debug logger. */ |
| | |
| | | } |
| | | |
| | | connectionID = DirectoryServer.newConnectionAccepted(this); |
| | | clientContext.onDisconnect(new DisconnectListener() { |
| | | clientContext.addConnectionEventListener(new ConnectionEventListener() { |
| | | @Override |
| | | public void exceptionOccurred(LDAPClientContext context, Throwable error) { |
| | | public void handleConnectionError(LDAPClientContext context, Throwable error) { |
| | | if (error instanceof LocalizableException) { |
| | | disconnect( |
| | | DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject()); |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, |
| | | public void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, |
| | | String diagnosticMessage) { |
| | | disconnect(DisconnectReason.SERVER_ERROR, false, null); |
| | | } |
| | | |
| | | @Override |
| | | public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) { |
| | | public void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) { |
| | | disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null); |
| | | } |
| | | }); |
| | |
| | | */ |
| | | @Override |
| | | public boolean isSecure() { |
| | | return false; |
| | | return clientContext.getSSLSession() != null || clientContext.getSASLServer() != null; |
| | | } |
| | | |
| | | /** |
| | |
| | | // an error result to the client indicating that a problem occurred. |
| | | if (removeOperationInProgress(operation.getMessageID())) { |
| | | final Response response = operationToResponse(operation); |
| | | final FlowableEmitter<Response> out = getOut(operation); |
| | | final FlowableEmitter<Response> out = getAttachedEmitter(operation); |
| | | if (response != null) { |
| | | out.onNext(response); |
| | | } |
| | |
| | | * The search result entry to be sent to the client |
| | | */ |
| | | @Override |
| | | public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) { |
| | | getEmitter(searchOperation).onNext(toResponse(searchEntry)); |
| | | public void sendSearchEntry(final SearchOperation searchOperation, final SearchResultEntry searchEntry) { |
| | | getAttachedEmitter(searchOperation).onNext(toResponse(searchEntry)); |
| | | } |
| | | |
| | | private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) { |
| | | return getOut(searchOperation); |
| | | } |
| | | |
| | | private Response toResponse(SearchResultEntry searchEntry) { |
| | | return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion)); |
| | | } |
| | | |
| | | private FlowableEmitter<Response> getOut(Operation operation) { |
| | | private FlowableEmitter<Response> getAttachedEmitter(final Operation operation) { |
| | | return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT)); |
| | | } |
| | | |
| | | private Response toResponse(final SearchResultEntry searchEntry) { |
| | | return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion)); |
| | | } |
| | | |
| | | /** |
| | | * Sends the provided search result reference to the client. |
| | | * |
| | |
| | | return false; |
| | | } |
| | | |
| | | final FlowableEmitter<Response> out = getOut(searchOperation); |
| | | final FlowableEmitter<Response> out = getAttachedEmitter(searchOperation); |
| | | out.onNext(Converters.from(searchReference)); |
| | | |
| | | return true; |
| | |
| | | @Override |
| | | protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) { |
| | | final Operation operation = intermediateResponse.getOperation(); |
| | | final FlowableEmitter<Response> out = getOut(operation); |
| | | final FlowableEmitter<Response> emitter = getAttachedEmitter(operation); |
| | | |
| | | final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(), |
| | | intermediateResponse.getValue()); |
| | |
| | | response.addControl(Converters.from(control)); |
| | | } |
| | | |
| | | out.onNext(response); |
| | | emitter.onNext(response); |
| | | |
| | | // The only reason we shouldn't continue processing is if the |
| | | // connection is closed. |
| | |
| | | * a fatal error and the client has been disconnected as a result, or if the client unbound from the server. |
| | | */ |
| | | @Override |
| | | public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) { |
| | | return streamFromPublisher( |
| | | new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) { |
| | | return streamFromPublisher(new BlockingBackpressureSubscription( |
| | | Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | @Override |
| | | public void subscribe(FlowableEmitter<Response> emitter) throws Exception { |
| | | processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter); |
| | | } |
| | | }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() { |
| | | }, BackpressureStrategy.ERROR))) |
| | | .onNext(new Consumer<Response>() { |
| | | @Override |
| | | public void accept(final Response response) throws Exception { |
| | | if (keepStats) { |
| | |
| | | } |
| | | } |
| | | |
| | | private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) { |
| | | private final byte toLdapResponseType(final LdapRequestEnvelope rawRequest, final Response response) { |
| | | if (response instanceof Result) { |
| | | return toLdapResultType(rawRequest.getMessageType()); |
| | | } |
| | |
| | | final List<Control> controls, final FlowableEmitter<Response> out) { |
| | | if (ldapVersion == 2 && !controls.isEmpty()) { |
| | | // LDAPv2 clients aren't allowed to send controls. |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage( |
| | | ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString())); |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR) |
| | | .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onComplete(); |
| | | disconnectControlsNotAllowed(); |
| | | return false; |
| | |
| | | versionString = "2"; |
| | | |
| | | if (!connectionHandler.allowLDAPv2()) { |
| | | out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage( |
| | | ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString())); |
| | | out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR) |
| | | .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onComplete(); |
| | | disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get()); |
| | | disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get()); |
| | | return false; |
| | | } |
| | | |
| | | if (!controls.isEmpty()) { |
| | | // LDAPv2 clients aren't allowed to send controls. |
| | | out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage( |
| | | ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR) |
| | | .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onComplete(); |
| | | disconnectControlsNotAllowed(); |
| | | return false; |
| | |
| | | final List<Control> controls, final FlowableEmitter<Response> out) { |
| | | if (ldapVersion == 2 && !controls.isEmpty()) { |
| | | // LDAPv2 clients aren't allowed to send controls. |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage( |
| | | ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR) |
| | | .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onComplete(); |
| | | disconnectControlsNotAllowed(); |
| | | return false; |
| | |
| | | final List<Control> controls, final FlowableEmitter<Response> out) { |
| | | if (ldapVersion == 2 && !controls.isEmpty()) { |
| | | // LDAPv2 clients aren't allowed to send controls. |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage( |
| | | ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR) |
| | | .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onComplete(); |
| | | disconnectControlsNotAllowed(); |
| | | return false; |
| | |
| | | final List<Control> controls, final FlowableEmitter<Response> out) { |
| | | if (ldapVersion == 2 && !controls.isEmpty()) { |
| | | // LDAPv2 clients aren't allowed to send controls. |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage( |
| | | ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR) |
| | | .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onComplete(); |
| | | disconnectControlsNotAllowed(); |
| | | return false; |
| | |
| | | final List<Control> controls, final FlowableEmitter<Response> out) { |
| | | if (ldapVersion == 2 && !controls.isEmpty()) { |
| | | // LDAPv2 clients aren't allowed to send controls. |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage( |
| | | ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR) |
| | | .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onComplete(); |
| | | disconnectControlsNotAllowed(); |
| | | return false; |