| | |
| | | package org.forgerock.opendj.reactive; |
| | | |
| | | import static com.forgerock.reactive.RxJavaStreams.*; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE; |
| | | import static org.opends.messages.CoreMessages.*; |
| | | import static org.opends.messages.ProtocolMessages.*; |
| | | import static org.opends.server.loggers.AccessLogger.logDisconnect; |
| | | import static org.opends.server.protocols.ldap.LDAPConstants.*; |
| | | import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST; |
| | | import static org.opends.server.util.StaticUtils.getExceptionMessage; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.net.InetAddress; |
| | | import java.nio.channels.ClosedChannelException; |
| | | import java.nio.channels.Selector; |
| | | import java.nio.channels.SocketChannel; |
| | | import java.security.cert.Certificate; |
| | | import java.util.Collection; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.Queue; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | import java.util.concurrent.locks.Condition; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | import javax.net.ssl.SSLEngine; |
| | | import javax.net.ssl.SSLPeerUnverifiedException; |
| | | import javax.net.ssl.SSLSession; |
| | | import javax.security.sasl.SaslServer; |
| | | |
| | | import org.forgerock.i18n.LocalizableException; |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | |
| | | import org.opends.server.types.SearchResultEntry; |
| | | import org.opends.server.types.SearchResultReference; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.reactivestreams.Publisher; |
| | | import org.reactivestreams.Subscriber; |
| | | import org.reactivestreams.Subscription; |
| | | |
| | | import com.forgerock.reactive.Consumer; |
| | | import com.forgerock.reactive.ReactiveHandler; |
| | | import com.forgerock.reactive.Single; |
| | | import com.forgerock.reactive.Stream; |
| | | |
| | | import io.reactivex.BackpressureOverflowStrategy; |
| | | import io.reactivex.BackpressureStrategy; |
| | | import io.reactivex.Flowable; |
| | | import io.reactivex.FlowableEmitter; |
| | |
| | | } |
| | | |
| | | // Controls are not allowed for LDAPv2 clients. |
| | | if (ldapVersion != 2) { |
| | | if (ldapVersion != 2 && operation.getResponseControls() != null) { |
| | | for (Control control : operation.getResponseControls()) { |
| | | result.addControl(Converters.from(control)); |
| | | } |
| | |
| | | } |
| | | |
| | | private Response toResponse(SearchResultEntry searchEntry) { |
| | | return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry)); |
| | | return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion)); |
| | | } |
| | | |
| | | private FlowableEmitter<Response> getOut(Operation operation) { |
| | |
| | | */ |
| | | @Override |
| | | public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) { |
| | | return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | @Override |
| | | public void subscribe(FlowableEmitter<Response> emitter) throws Exception { |
| | | processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter); |
| | | } |
| | | }, BackpressureStrategy.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR))); |
| | | return singleFrom(streamFromPublisher( |
| | | new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | @Override |
| | | public void subscribe(FlowableEmitter<Response> emitter) throws Exception { |
| | | processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter); |
| | | } |
| | | }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() { |
| | | @Override |
| | | public void accept(final Response response) throws Exception { |
| | | if (keepStats) { |
| | | statTracker.updateMessageWritten( |
| | | toLdapResponseType(message, response), message.getMessageId()); |
| | | } |
| | | } |
| | | })); |
| | | } |
| | | |
| | | private final byte toLdapResultType(final byte requestType) { |
| | | switch (requestType) { |
| | | case OP_TYPE_ADD_REQUEST: |
| | | return OP_TYPE_ADD_RESPONSE; |
| | | case OP_TYPE_BIND_REQUEST: |
| | | return OP_TYPE_BIND_RESPONSE; |
| | | case OP_TYPE_COMPARE_REQUEST: |
| | | return OP_TYPE_COMPARE_RESPONSE; |
| | | case OP_TYPE_DELETE_REQUEST: |
| | | return OP_TYPE_DELETE_RESPONSE; |
| | | case OP_TYPE_EXTENDED_REQUEST: |
| | | return OP_TYPE_EXTENDED_RESPONSE; |
| | | case OP_TYPE_MODIFY_DN_REQUEST: |
| | | return OP_TYPE_MODIFY_DN_RESPONSE; |
| | | case OP_TYPE_MODIFY_REQUEST: |
| | | return OP_TYPE_MODIFY_RESPONSE; |
| | | case OP_TYPE_SEARCH_REQUEST: |
| | | return OP_TYPE_SEARCH_RESULT_DONE; |
| | | default: |
| | | throw new IllegalArgumentException("Unknown request: " + requestType); |
| | | } |
| | | } |
| | | |
| | | private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) { |
| | | if (response instanceof Result) { |
| | | return toLdapResultType(rawRequest.getMessageType()); |
| | | } |
| | | if (response instanceof org.forgerock.opendj.ldap.responses.IntermediateResponse) { |
| | | return OP_TYPE_INTERMEDIATE_RESPONSE; |
| | | } |
| | | if (response instanceof org.forgerock.opendj.ldap.responses.SearchResultEntry) { |
| | | return OP_TYPE_SEARCH_RESULT_ENTRY; |
| | | } |
| | | if (response instanceof org.forgerock.opendj.ldap.responses.SearchResultReference) { |
| | | return OP_TYPE_SEARCH_RESULT_REFERENCE; |
| | | } |
| | | throw new IllegalArgumentException(); |
| | | } |
| | | |
| | | private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message, |
| | |
| | | } catch (DirectoryException de) { |
| | | logger.traceException(de); |
| | | |
| | | final Result result = Responses.newResult(de.getResultCode()) |
| | | .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString()); |
| | | for (String referral : de.getReferralURLs()) { |
| | | result.addReferralURI(referral); |
| | | final Result result = Responses.newResult(de.getResultCode()); |
| | | if (de.getLocalizedMessage() != null) { |
| | | result.setDiagnosticMessage(de.getLocalizedMessage()); |
| | | } |
| | | if (de.getMatchedDN() != null) { |
| | | result.setMatchedDN(de.getMatchedDN().toString()); |
| | | } |
| | | if (de.getReferralURLs() != null) { |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | } |
| | | if (ldapVersion != 2 && addOp.getResponseControls() != null) { |
| | | for (Control control : addOp.getResponseControls()) { |
| | | result.addControl(Converters.from(control)); |
| | | } |
| | | } |
| | | |
| | | out.onNext(result); |
| | |
| | | } catch (DirectoryException de) { |
| | | logger.traceException(de); |
| | | |
| | | final Result result = Responses.newBindResult(de.getResultCode()) |
| | | .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString()); |
| | | for (String referral : de.getReferralURLs()) { |
| | | result.addReferralURI(referral); |
| | | final Result result = Responses.newBindResult(de.getResultCode()); |
| | | if (de.getLocalizedMessage() != null) { |
| | | result.setDiagnosticMessage(de.getLocalizedMessage()); |
| | | } |
| | | |
| | | if (de.getMatchedDN() != null) { |
| | | result.setMatchedDN(de.getMatchedDN().toString()); |
| | | } |
| | | if (de.getReferralURLs() != null) { |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | } |
| | | if (ldapVersion != 2 && bindOp.getResponseControls() != null) { |
| | | for (Control control : bindOp.getResponseControls()) { |
| | | result.addControl(Converters.from(control)); |
| | | } |
| | | } |
| | | out.onNext(result); |
| | | out.onComplete(); |
| | | |
| | |
| | | final List<Control> controls, final FlowableEmitter<Response> out) { |
| | | if (ldapVersion == 2 && !controls.isEmpty()) { |
| | | // LDAPv2 clients aren't allowed to send controls. |
| | | out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage( |
| | | ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onNext(Responses.newCompareResult(ResultCode.PROTOCOL_ERROR) |
| | | .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString())); |
| | | out.onComplete(); |
| | | disconnectControlsNotAllowed(); |
| | | return false; |
| | |
| | | } catch (DirectoryException de) { |
| | | logger.traceException(de); |
| | | |
| | | final CompareResult result = Responses.newCompareResult(de.getResultCode()) |
| | | .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString()); |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | final CompareResult result = Responses.newCompareResult(de.getResultCode()); |
| | | if (de.getLocalizedMessage() != null) { |
| | | result.setDiagnosticMessage(de.getLocalizedMessage()); |
| | | } |
| | | if (de.getMatchedDN() != null) { |
| | | result.setMatchedDN(de.getMatchedDN().toString()); |
| | | } |
| | | if (de.getReferralURLs() != null) { |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | } |
| | | if (ldapVersion != 2 && compareOp.getResponseControls() != null) { |
| | | for (Control control : compareOp.getResponseControls()) { |
| | | result.addControl(Converters.from(control)); |
| | | } |
| | | } |
| | | out.onNext(result); |
| | | out.onComplete(); |
| | | } |
| | |
| | | } catch (DirectoryException de) { |
| | | logger.traceException(de); |
| | | |
| | | final Result result = Responses.newResult(de.getResultCode()) |
| | | .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString()); |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | final Result result = Responses.newResult(de.getResultCode()); |
| | | if (de.getLocalizedMessage() != null) { |
| | | result.setDiagnosticMessage(de.getLocalizedMessage()); |
| | | } |
| | | if (de.getMatchedDN() != null) { |
| | | result.setMatchedDN(de.getMatchedDN().toString()); |
| | | } |
| | | if (de.getReferralURLs() != null) { |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | } |
| | | if (ldapVersion != 2 && deleteOp.getResponseControls() != null) { |
| | | for (Control control : deleteOp.getResponseControls()) { |
| | | result.addControl(Converters.from(control)); |
| | | } |
| | | } |
| | | |
| | | out.onNext(result); |
| | | out.onComplete(); |
| | |
| | | addOperationInProgress(queueingStrategy, extendedOp); |
| | | } catch (DirectoryException de) { |
| | | logger.traceException(de); |
| | | final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()) |
| | | .setMatchedDN(de.getMatchedDN().toString()); |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | |
| | | final Result result = Responses.newGenericExtendedResult(de.getResultCode()); |
| | | if (de.getLocalizedMessage() != null) { |
| | | result.setDiagnosticMessage(de.getLocalizedMessage()); |
| | | } |
| | | if (de.getMatchedDN() != null) { |
| | | result.setMatchedDN(de.getMatchedDN().toString()); |
| | | } |
| | | if (de.getReferralURLs() != null) { |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | } |
| | | if (ldapVersion != 2 && extendedOp.getResponseControls() != null) { |
| | | for (Control control : extendedOp.getResponseControls()) { |
| | | result.addControl(Converters.from(control)); |
| | | } |
| | | } |
| | | out.onNext(result); |
| | | out.onComplete(); |
| | | } |
| | |
| | | addOperationInProgress(queueingStrategy, modifyOp); |
| | | } catch (DirectoryException de) { |
| | | logger.traceException(de); |
| | | final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()) |
| | | .setMatchedDN(de.getMatchedDN().toString()); |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | |
| | | final Result result = Responses.newResult(de.getResultCode()); |
| | | if (de.getLocalizedMessage() != null) { |
| | | result.setDiagnosticMessage(de.getLocalizedMessage()); |
| | | } |
| | | if (de.getMatchedDN() != null) { |
| | | result.setMatchedDN(de.getMatchedDN().toString()); |
| | | } |
| | | if (de.getReferralURLs() != null) { |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | } |
| | | if (ldapVersion != 2 && modifyOp.getResponseControls() != null) { |
| | | for (Control control : modifyOp.getResponseControls()) { |
| | | result.addControl(Converters.from(control)); |
| | | } |
| | | } |
| | | out.onNext(result); |
| | | out.onComplete(); |
| | | } |
| | |
| | | } catch (DirectoryException de) { |
| | | logger.traceException(de); |
| | | |
| | | final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()) |
| | | .setMatchedDN(de.getMatchedDN().toString()); |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | for (Control control : modifyDNOp.getResponseControls()) { |
| | | result.addControl(Converters.from(control)); |
| | | final Result result = Responses.newResult(de.getResultCode()); |
| | | if (de.getLocalizedMessage() != null) { |
| | | result.setDiagnosticMessage(de.getLocalizedMessage()); |
| | | } |
| | | if (de.getMatchedDN() != null) { |
| | | result.setMatchedDN(de.getMatchedDN().toString()); |
| | | } |
| | | if (de.getReferralURLs() != null) { |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | } |
| | | if (ldapVersion != 2 && modifyDNOp.getResponseControls() != null) { |
| | | for (Control control : modifyDNOp.getResponseControls()) { |
| | | result.addControl(Converters.from(control)); |
| | | } |
| | | } |
| | | out.onNext(result); |
| | | out.onComplete(); |
| | |
| | | if (de.getReferralURLs() != null) { |
| | | result.getReferralURIs().addAll(de.getReferralURLs()); |
| | | } |
| | | if (searchOp.getResponseControls() != null) { |
| | | if (ldapVersion != 2 && searchOp.getResponseControls() != null) { |
| | | for (Control control : searchOp.getResponseControls()) { |
| | | result.addControl(Converters.from(control)); |
| | | } |
| | |
| | | } |
| | | |
| | | @Override |
| | | public boolean prepareTLS(LocalizableMessageBuilder unavailableReason) { |
| | | throw new UnsupportedOperationException(); |
| | | public boolean prepareTLS(final LocalizableMessageBuilder unavailableReason) { |
| | | // Make sure that the connection handler allows the use of the |
| | | // StartTLS operation. |
| | | if (!connectionHandler.allowStartTLS()) { |
| | | unavailableReason.append(ERR_LDAP_TLS_STARTTLS_NOT_ALLOWED.get()); |
| | | return false; |
| | | } |
| | | try { |
| | | if (!clientContext.enableTLS(connectionHandler.createSSLEngine(), true)) { |
| | | unavailableReason.append(ERR_LDAP_TLS_EXISTING_SECURITY_PROVIDER.get(SSLEngine.class.getName())); |
| | | return false; |
| | | } |
| | | } catch (DirectoryException de) { |
| | | logger.traceException(de); |
| | | unavailableReason.append(ERR_LDAP_TLS_CANNOT_CREATE_TLS_PROVIDER.get(stackTraceToSingleLineString(de))); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Installs the SASL security layer on the underlying connection. |
| | | * |
| | | * @param saslServer |
| | | * The {@code SaslServer} which should be used to secure the conneciton. |
| | | */ |
| | | public void enableSASL(final SaslServer saslServer) { |
| | | clientContext.enableSASL(saslServer); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return The array of certificates associated with a connection. |
| | | */ |
| | | public Certificate[] getClientCertificateChain() { |
| | | final SSLSession sslSession = clientContext.getSSLSession(); |
| | | if (sslSession != null) { |
| | | try { |
| | | return sslSession.getPeerCertificates(); |
| | | } catch (SSLPeerUnverifiedException e) { |
| | | logger.traceException(e); |
| | | } |
| | | } |
| | | return new Certificate[0]; |
| | | } |
| | | |
| | | @Override |
| | | public int getSSF() { |
| | | return 0; |
| | | return clientContext.getSecurityStrengthFactor(); |
| | | } |
| | | |
| | | /** Upstream -> BlockingBackpressureSubscription -> Downstream */ |
| | | private final class BlockingBackpressureSubscription |
| | | implements Subscription, Publisher<Response>, Subscriber<Response> { |
| | | private long pendingRequests; |
| | | private final Queue<Response> queue = new LinkedList<>(); |
| | | private final Lock lock = new ReentrantLock(); |
| | | private final Condition spaceAvailable = lock.newCondition(); |
| | | private final Publisher<Response> upstream; |
| | | private final long writeTimeoutMillis; |
| | | private Subscription subscription; |
| | | private Subscriber<? super Response> downstream; |
| | | |
| | | BlockingBackpressureSubscription(final Publisher<Response> upstream) { |
| | | this.upstream = upstream; |
| | | this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0 |
| | | ? 30000 // Do not wait indefinitely, |
| | | : connectionHandler.getMaxBlockedWriteTimeLimit(); |
| | | } |
| | | |
| | | @Override |
| | | public void subscribe(final Subscriber<? super Response> subscriber) { |
| | | if (downstream != null) { |
| | | return; |
| | | } |
| | | downstream = subscriber; |
| | | subscriber.onSubscribe(this); |
| | | upstream.subscribe(this); |
| | | } |
| | | |
| | | @Override |
| | | public void onSubscribe(final Subscription s) { |
| | | if ( subscription != null) { |
| | | s.cancel(); |
| | | return; |
| | | } |
| | | subscription = s; |
| | | subscription.request(Long.MAX_VALUE); |
| | | } |
| | | |
| | | @Override |
| | | public void request(long n) { |
| | | lock.lock(); |
| | | try { |
| | | if (pendingRequests != Long.MIN_VALUE) { |
| | | pendingRequests += n; |
| | | drain(); |
| | | } |
| | | } finally { |
| | | lock.unlock(); |
| | | } |
| | | } |
| | | |
| | | private void drain() { |
| | | Response response; |
| | | try { |
| | | while (pendingRequests > 0 && (response = queue.poll()) != null) { |
| | | downstream.onNext(response); |
| | | // Forward response |
| | | pendingRequests--; |
| | | } |
| | | } finally { |
| | | spaceAvailable.signalAll(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onNext(final Response response) { |
| | | lock.lock(); |
| | | try { |
| | | while (queue.size() >= 32) { |
| | | try { |
| | | if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) { |
| | | // If we've gotten here, then the write timed out. |
| | | downstream.onError(new ClosedChannelException()); |
| | | cancel(); |
| | | return; |
| | | } |
| | | } catch (InterruptedException e) { |
| | | downstream.onError(e); |
| | | cancel(); |
| | | return; |
| | | } |
| | | } |
| | | queue.add(response); |
| | | drain(); |
| | | } finally { |
| | | lock.unlock(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onError(Throwable t) { |
| | | downstream.onError(t); |
| | | cancel(); |
| | | } |
| | | |
| | | @Override |
| | | public void onComplete() { |
| | | downstream.onComplete(); |
| | | cancel(); |
| | | } |
| | | |
| | | @Override |
| | | public void cancel() { |
| | | if (subscription != null) { |
| | | subscription.cancel(); |
| | | } |
| | | queue.clear(); |
| | | pendingRequests = Long.MIN_VALUE; |
| | | } |
| | | } |
| | | } |