From 86ad6a08499797f9b3204896caee947abb03394f Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 390 +++++++++++++++++++++++++++++++++++++++++++++++++------
1 files changed, 346 insertions(+), 44 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 593a92b..efe9929 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -17,22 +17,53 @@
package org.forgerock.opendj.reactive;
import static com.forgerock.reactive.RxJavaStreams.*;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.ProtocolMessages.*;
import static org.opends.server.loggers.AccessLogger.logDisconnect;
import static org.opends.server.protocols.ldap.LDAPConstants.*;
import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST;
-import static org.opends.server.util.StaticUtils.getExceptionMessage;
+import static org.opends.server.util.StaticUtils.*;
import java.net.InetAddress;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.sasl.SaslServer;
import org.forgerock.i18n.LocalizableException;
import org.forgerock.i18n.LocalizableMessage;
@@ -93,12 +124,15 @@
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
import org.opends.server.util.TimeThread;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
-import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
@@ -502,7 +536,7 @@
}
// Controls are not allowed for LDAPv2 clients.
- if (ldapVersion != 2) {
+ if (ldapVersion != 2 && operation.getResponseControls() != null) {
for (Control control : operation.getResponseControls()) {
result.addControl(Converters.from(control));
}
@@ -529,7 +563,7 @@
}
private Response toResponse(SearchResultEntry searchEntry) {
- return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
+ return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
}
private FlowableEmitter<Response> getOut(Operation operation) {
@@ -949,12 +983,60 @@
*/
@Override
public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
- return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
- @Override
- public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
- processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
- }
- }, BackpressureStrategy.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
+ return singleFrom(streamFromPublisher(
+ new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
+ @Override
+ public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
+ processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
+ }
+ }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() {
+ @Override
+ public void accept(final Response response) throws Exception {
+ if (keepStats) {
+ statTracker.updateMessageWritten(
+ toLdapResponseType(message, response), message.getMessageId());
+ }
+ }
+ }));
+ }
+
+ private final byte toLdapResultType(final byte requestType) {
+ switch (requestType) {
+ case OP_TYPE_ADD_REQUEST:
+ return OP_TYPE_ADD_RESPONSE;
+ case OP_TYPE_BIND_REQUEST:
+ return OP_TYPE_BIND_RESPONSE;
+ case OP_TYPE_COMPARE_REQUEST:
+ return OP_TYPE_COMPARE_RESPONSE;
+ case OP_TYPE_DELETE_REQUEST:
+ return OP_TYPE_DELETE_RESPONSE;
+ case OP_TYPE_EXTENDED_REQUEST:
+ return OP_TYPE_EXTENDED_RESPONSE;
+ case OP_TYPE_MODIFY_DN_REQUEST:
+ return OP_TYPE_MODIFY_DN_RESPONSE;
+ case OP_TYPE_MODIFY_REQUEST:
+ return OP_TYPE_MODIFY_RESPONSE;
+ case OP_TYPE_SEARCH_REQUEST:
+ return OP_TYPE_SEARCH_RESULT_DONE;
+ default:
+ throw new IllegalArgumentException("Unknown request: " + requestType);
+ }
+ }
+
+ private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) {
+ if (response instanceof Result) {
+ return toLdapResultType(rawRequest.getMessageType());
+ }
+ if (response instanceof org.forgerock.opendj.ldap.responses.IntermediateResponse) {
+ return OP_TYPE_INTERMEDIATE_RESPONSE;
+ }
+ if (response instanceof org.forgerock.opendj.ldap.responses.SearchResultEntry) {
+ return OP_TYPE_SEARCH_RESULT_ENTRY;
+ }
+ if (response instanceof org.forgerock.opendj.ldap.responses.SearchResultReference) {
+ return OP_TYPE_SEARCH_RESULT_REFERENCE;
+ }
+ throw new IllegalArgumentException();
}
private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
@@ -1116,10 +1198,20 @@
} catch (DirectoryException de) {
logger.traceException(de);
- final Result result = Responses.newResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
- for (String referral : de.getReferralURLs()) {
- result.addReferralURI(referral);
+ final Result result = Responses.newResult(de.getResultCode());
+ if (de.getLocalizedMessage() != null) {
+ result.setDiagnosticMessage(de.getLocalizedMessage());
+ }
+ if (de.getMatchedDN() != null) {
+ result.setMatchedDN(de.getMatchedDN().toString());
+ }
+ if (de.getReferralURLs() != null) {
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ }
+ if (ldapVersion != 2 && addOp.getResponseControls() != null) {
+ for (Control control : addOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
}
out.onNext(result);
@@ -1216,12 +1308,21 @@
} catch (DirectoryException de) {
logger.traceException(de);
- final Result result = Responses.newBindResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
- for (String referral : de.getReferralURLs()) {
- result.addReferralURI(referral);
+ final Result result = Responses.newBindResult(de.getResultCode());
+ if (de.getLocalizedMessage() != null) {
+ result.setDiagnosticMessage(de.getLocalizedMessage());
}
-
+ if (de.getMatchedDN() != null) {
+ result.setMatchedDN(de.getMatchedDN().toString());
+ }
+ if (de.getReferralURLs() != null) {
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ }
+ if (ldapVersion != 2 && bindOp.getResponseControls() != null) {
+ for (Control control : bindOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ }
out.onNext(result);
out.onComplete();
@@ -1252,8 +1353,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newCompareResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1271,9 +1372,21 @@
} catch (DirectoryException de) {
logger.traceException(de);
- final CompareResult result = Responses.newCompareResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
+ final CompareResult result = Responses.newCompareResult(de.getResultCode());
+ if (de.getLocalizedMessage() != null) {
+ result.setDiagnosticMessage(de.getLocalizedMessage());
+ }
+ if (de.getMatchedDN() != null) {
+ result.setMatchedDN(de.getMatchedDN().toString());
+ }
+ if (de.getReferralURLs() != null) {
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ }
+ if (ldapVersion != 2 && compareOp.getResponseControls() != null) {
+ for (Control control : compareOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ }
out.onNext(result);
out.onComplete();
}
@@ -1315,9 +1428,21 @@
} catch (DirectoryException de) {
logger.traceException(de);
- final Result result = Responses.newResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
+ final Result result = Responses.newResult(de.getResultCode());
+ if (de.getLocalizedMessage() != null) {
+ result.setDiagnosticMessage(de.getLocalizedMessage());
+ }
+ if (de.getMatchedDN() != null) {
+ result.setMatchedDN(de.getMatchedDN().toString());
+ }
+ if (de.getReferralURLs() != null) {
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ }
+ if (ldapVersion != 2 && deleteOp.getResponseControls() != null) {
+ for (Control control : deleteOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ }
out.onNext(result);
out.onComplete();
@@ -1371,10 +1496,21 @@
addOperationInProgress(queueingStrategy, extendedOp);
} catch (DirectoryException de) {
logger.traceException(de);
- final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
-
+ final Result result = Responses.newGenericExtendedResult(de.getResultCode());
+ if (de.getLocalizedMessage() != null) {
+ result.setDiagnosticMessage(de.getLocalizedMessage());
+ }
+ if (de.getMatchedDN() != null) {
+ result.setMatchedDN(de.getMatchedDN().toString());
+ }
+ if (de.getReferralURLs() != null) {
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ }
+ if (ldapVersion != 2 && extendedOp.getResponseControls() != null) {
+ for (Control control : extendedOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ }
out.onNext(result);
out.onComplete();
}
@@ -1415,10 +1551,21 @@
addOperationInProgress(queueingStrategy, modifyOp);
} catch (DirectoryException de) {
logger.traceException(de);
- final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
-
+ final Result result = Responses.newResult(de.getResultCode());
+ if (de.getLocalizedMessage() != null) {
+ result.setDiagnosticMessage(de.getLocalizedMessage());
+ }
+ if (de.getMatchedDN() != null) {
+ result.setMatchedDN(de.getMatchedDN().toString());
+ }
+ if (de.getReferralURLs() != null) {
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ }
+ if (ldapVersion != 2 && modifyOp.getResponseControls() != null) {
+ for (Control control : modifyOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ }
out.onNext(result);
out.onComplete();
}
@@ -1461,11 +1608,20 @@
} catch (DirectoryException de) {
logger.traceException(de);
- final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
- for (Control control : modifyDNOp.getResponseControls()) {
- result.addControl(Converters.from(control));
+ final Result result = Responses.newResult(de.getResultCode());
+ if (de.getLocalizedMessage() != null) {
+ result.setDiagnosticMessage(de.getLocalizedMessage());
+ }
+ if (de.getMatchedDN() != null) {
+ result.setMatchedDN(de.getMatchedDN().toString());
+ }
+ if (de.getReferralURLs() != null) {
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ }
+ if (ldapVersion != 2 && modifyDNOp.getResponseControls() != null) {
+ for (Control control : modifyDNOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
}
out.onNext(result);
out.onComplete();
@@ -1520,7 +1676,7 @@
if (de.getReferralURLs() != null) {
result.getReferralURIs().addAll(de.getReferralURLs());
}
- if (searchOp.getResponseControls() != null) {
+ if (ldapVersion != 2 && searchOp.getResponseControls() != null) {
for (Control control : searchOp.getResponseControls()) {
result.addControl(Converters.from(control));
}
@@ -1611,8 +1767,34 @@
}
@Override
- public boolean prepareTLS(LocalizableMessageBuilder unavailableReason) {
- throw new UnsupportedOperationException();
+ public boolean prepareTLS(final LocalizableMessageBuilder unavailableReason) {
+ // Make sure that the connection handler allows the use of the
+ // StartTLS operation.
+ if (!connectionHandler.allowStartTLS()) {
+ unavailableReason.append(ERR_LDAP_TLS_STARTTLS_NOT_ALLOWED.get());
+ return false;
+ }
+ try {
+ if (!clientContext.enableTLS(connectionHandler.createSSLEngine(), true)) {
+ unavailableReason.append(ERR_LDAP_TLS_EXISTING_SECURITY_PROVIDER.get(SSLEngine.class.getName()));
+ return false;
+ }
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+ unavailableReason.append(ERR_LDAP_TLS_CANNOT_CREATE_TLS_PROVIDER.get(stackTraceToSingleLineString(de)));
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Installs the SASL security layer on the underlying connection.
+ *
+ * @param saslServer
+ * The {@code SaslServer} which should be used to secure the conneciton.
+ */
+ public void enableSASL(final SaslServer saslServer) {
+ clientContext.enableSASL(saslServer);
}
/**
@@ -1639,11 +1821,131 @@
* @return The array of certificates associated with a connection.
*/
public Certificate[] getClientCertificateChain() {
+ final SSLSession sslSession = clientContext.getSSLSession();
+ if (sslSession != null) {
+ try {
+ return sslSession.getPeerCertificates();
+ } catch (SSLPeerUnverifiedException e) {
+ logger.traceException(e);
+ }
+ }
return new Certificate[0];
}
@Override
public int getSSF() {
- return 0;
+ return clientContext.getSecurityStrengthFactor();
+ }
+
+ /** Upstream -> BlockingBackpressureSubscription -> Downstream */
+ private final class BlockingBackpressureSubscription
+ implements Subscription, Publisher<Response>, Subscriber<Response> {
+ private long pendingRequests;
+ private final Queue<Response> queue = new LinkedList<>();
+ private final Lock lock = new ReentrantLock();
+ private final Condition spaceAvailable = lock.newCondition();
+ private final Publisher<Response> upstream;
+ private final long writeTimeoutMillis;
+ private Subscription subscription;
+ private Subscriber<? super Response> downstream;
+
+ BlockingBackpressureSubscription(final Publisher<Response> upstream) {
+ this.upstream = upstream;
+ this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
+ ? 30000 // Do not wait indefinitely,
+ : connectionHandler.getMaxBlockedWriteTimeLimit();
+ }
+
+ @Override
+ public void subscribe(final Subscriber<? super Response> subscriber) {
+ if (downstream != null) {
+ return;
+ }
+ downstream = subscriber;
+ subscriber.onSubscribe(this);
+ upstream.subscribe(this);
+ }
+
+ @Override
+ public void onSubscribe(final Subscription s) {
+ if ( subscription != null) {
+ s.cancel();
+ return;
+ }
+ subscription = s;
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void request(long n) {
+ lock.lock();
+ try {
+ if (pendingRequests != Long.MIN_VALUE) {
+ pendingRequests += n;
+ drain();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void drain() {
+ Response response;
+ try {
+ while (pendingRequests > 0 && (response = queue.poll()) != null) {
+ downstream.onNext(response);
+ // Forward response
+ pendingRequests--;
+ }
+ } finally {
+ spaceAvailable.signalAll();
+ }
+ }
+
+ @Override
+ public void onNext(final Response response) {
+ lock.lock();
+ try {
+ while (queue.size() >= 32) {
+ try {
+ if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
+ // If we've gotten here, then the write timed out.
+ downstream.onError(new ClosedChannelException());
+ cancel();
+ return;
+ }
+ } catch (InterruptedException e) {
+ downstream.onError(e);
+ cancel();
+ return;
+ }
+ }
+ queue.add(response);
+ drain();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ downstream.onError(t);
+ cancel();
+ }
+
+ @Override
+ public void onComplete() {
+ downstream.onComplete();
+ cancel();
+ }
+
+ @Override
+ public void cancel() {
+ if (subscription != null) {
+ subscription.cancel();
+ }
+ queue.clear();
+ pendingRequests = Long.MIN_VALUE;
+ }
}
}
--
Gitblit v1.10.0