From bdc5fa0dd0980eb9e077ae80644504705a24e035 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 83 ++++++++++++++++++++---------------------
1 files changed, 40 insertions(+), 43 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index af9a708..2cb5125 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -54,14 +54,14 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
+import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.opends.server.api.ClientConnection;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.core.AbandonOperationBasis;
@@ -123,7 +123,7 @@
* instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
*/
public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
- ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
+ ReactiveHandler<QueueingStrategy, LdapRequestEnvelope, Stream<Response>> {
private static final String REACTIVE_OUT = "reactive.out";
/** The tracer object for the debug logger. */
@@ -230,9 +230,9 @@
}
connectionID = DirectoryServer.newConnectionAccepted(this);
- clientContext.onDisconnect(new DisconnectListener() {
+ clientContext.addConnectionEventListener(new ConnectionEventListener() {
@Override
- public void exceptionOccurred(LDAPClientContext context, Throwable error) {
+ public void handleConnectionError(LDAPClientContext context, Throwable error) {
if (error instanceof LocalizableException) {
disconnect(
DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
@@ -242,13 +242,13 @@
}
@Override
- public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
+ public void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode,
String diagnosticMessage) {
disconnect(DisconnectReason.SERVER_ERROR, false, null);
}
@Override
- public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
+ public void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
}
});
@@ -372,7 +372,7 @@
*/
@Override
public boolean isSecure() {
- return false;
+ return clientContext.getSSLSession() != null || clientContext.getSASLServer() != null;
}
/**
@@ -405,7 +405,7 @@
// an error result to the client indicating that a problem occurred.
if (removeOperationInProgress(operation.getMessageID())) {
final Response response = operationToResponse(operation);
- final FlowableEmitter<Response> out = getOut(operation);
+ final FlowableEmitter<Response> out = getAttachedEmitter(operation);
if (response != null) {
out.onNext(response);
}
@@ -534,22 +534,18 @@
* The search result entry to be sent to the client
*/
@Override
- public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
- getEmitter(searchOperation).onNext(toResponse(searchEntry));
+ public void sendSearchEntry(final SearchOperation searchOperation, final SearchResultEntry searchEntry) {
+ getAttachedEmitter(searchOperation).onNext(toResponse(searchEntry));
}
- private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
- return getOut(searchOperation);
- }
-
- private Response toResponse(SearchResultEntry searchEntry) {
- return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
- }
-
- private FlowableEmitter<Response> getOut(Operation operation) {
+ private FlowableEmitter<Response> getAttachedEmitter(final Operation operation) {
return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
}
+ private Response toResponse(final SearchResultEntry searchEntry) {
+ return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
+ }
+
/**
* Sends the provided search result reference to the client.
*
@@ -572,7 +568,7 @@
return false;
}
- final FlowableEmitter<Response> out = getOut(searchOperation);
+ final FlowableEmitter<Response> out = getAttachedEmitter(searchOperation);
out.onNext(Converters.from(searchReference));
return true;
@@ -589,7 +585,7 @@
@Override
protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
final Operation operation = intermediateResponse.getOperation();
- final FlowableEmitter<Response> out = getOut(operation);
+ final FlowableEmitter<Response> emitter = getAttachedEmitter(operation);
final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
intermediateResponse.getValue());
@@ -597,7 +593,7 @@
response.addControl(Converters.from(control));
}
- out.onNext(response);
+ emitter.onNext(response);
// The only reason we shouldn't continue processing is if the
// connection is closed.
@@ -962,14 +958,15 @@
* a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
*/
@Override
- public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
- return streamFromPublisher(
- new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
+ public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
+ return streamFromPublisher(new BlockingBackpressureSubscription(
+ Flowable.create(new FlowableOnSubscribe<Response>() {
@Override
public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
}
- }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() {
+ }, BackpressureStrategy.ERROR)))
+ .onNext(new Consumer<Response>() {
@Override
public void accept(final Response response) throws Exception {
if (keepStats) {
@@ -1003,7 +1000,7 @@
}
}
- private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) {
+ private final byte toLdapResponseType(final LdapRequestEnvelope rawRequest, final Response response) {
if (response instanceof Result) {
return toLdapResultType(rawRequest.getMessageType());
}
@@ -1160,8 +1157,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1229,17 +1226,17 @@
versionString = "2";
if (!connectionHandler.allowLDAPv2()) {
- out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
- disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
+ disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
return false;
}
if (!controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1390,8 +1387,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1514,8 +1511,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1569,8 +1566,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1626,8 +1623,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
--
Gitblit v1.10.0