From bdc5fa0dd0980eb9e077ae80644504705a24e035 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java | 317 ++++++++++
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java | 64 -
opendj-core/clirr-ignored-api-changes.xml | 50
opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java | 14
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java | 10
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java | 83 +
opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java | 13
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java | 24
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java | 10
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java | 28
opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java | 14
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java | 11
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java | 2
opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java | 23
opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java | 3
opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java | 26
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java | 23
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 83 +-
opendj-core/src/main/java/com/forgerock/reactive/Completable.java | 20
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java | 30
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java | 9
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java | 116 ++-
opendj-core/src/main/java/com/forgerock/reactive/Action.java | 2
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java | 30
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java | 222 +++----
opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java | 2
opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java | 10
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java | 17
/dev/null | 154 -----
opendj-core/src/main/java/com/forgerock/reactive/Consumer.java | 6
opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java | 15
opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java | 3
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java | 148 ----
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java | 2
opendj-core/src/main/java/com/forgerock/reactive/Single.java | 29
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java | 22
opendj-core/src/main/java/com/forgerock/reactive/Stream.java | 17
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java | 83 +-
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java | 39
39 files changed, 964 insertions(+), 810 deletions(-)
diff --git a/opendj-core/clirr-ignored-api-changes.xml b/opendj-core/clirr-ignored-api-changes.xml
index aeb2f55..6a629c1 100644
--- a/opendj-core/clirr-ignored-api-changes.xml
+++ b/opendj-core/clirr-ignored-api-changes.xml
@@ -153,6 +153,12 @@
</difference>
<difference>
<className>org/forgerock/opendj/ldap/LDAPClientContext</className>
+ <differenceType>7012</differenceType>
+ <method>javax.security.sasl.SaslServer getSASLServer()</method>
+ <justification>Simplify management of security layer</justification>
+ </difference>
+ <difference>
+ <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
<differenceType>7004</differenceType>
<method>void enableTLS(javax.net.ssl.SSLContext, java.lang.String[], java.lang.String[], boolean, boolean)</method>
<justification>Simplify management of security layer</justification>
@@ -167,27 +173,15 @@
<difference>
<className>org/forgerock/opendj/ldap/LDAPClientContext</className>
<differenceType>7012</differenceType>
- <method>void onDisconnect(org.forgerock.opendj.ldap.LDAPClientContext$DisconnectListener)</method>
+ <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContext$ConnectionEventListener)</method>
<justification>Allows to register connection state listener</justification>
</difference>
<difference>
<className>org/forgerock/opendj/ldap/LDAPListener</className>
<differenceType>7005</differenceType>
- <method>LDAPListener(java.net.InetSocketAddress, org.forgerock.opendj.ldap.ServerConnectionFactory, org.forgerock.util.Options)</method>
- <to>%regex[LDAPListener\(java\.net\.SocketAddress,\s*org\.forgerock\.opendj\.ldap\.ServerConnectionFactory,\s*org\.forgerock\.util\.Options\)]</to>
- <justification>Accept multiple SocketAddress to bind to</justification>
- </difference>
- <difference>
- <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
- <differenceType>7002</differenceType>
- <method>java.net.InetSocketAddress getSocketAddress()</method>
- <justification>Accept multiple SocketAddress to bind to</justification>
- </difference>
- <difference>
- <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
- <differenceType>7012</differenceType>
- <method>java.util.Set getSocketAddresses()</method>
- <justification>Accept multiple SocketAddress to bind to</justification>
+ <method>%regex[LDAPListener\((((java\.lang\.String, +)?int,)|(java\.net\.InetSocketAddress,))\s*org\.forgerock\.opendj\.ldap\.ServerConnectionFactory(, +org\.forgerock\.util\.Options)?\)]</method>
+ <to>%regex[LDAPListener\((((java\.lang\.String, +)?int,)|(java\.util\.Set,)) +org\.forgerock\.util\.Function(, +org\.forgerock\.util\.Options)?\)]</to>
+ <justification>Allow multiple bind addressses. Use function rather than specific interface</justification>
</difference>
<difference>
<className>org/forgerock/opendj/ldap/LDAPListener</className>
@@ -195,13 +189,7 @@
<method>java.net.InetSocketAddress getSocketAddress()</method>
<justification>Accept multiple SocketAddress to bind to</justification>
</difference>
- <difference>
- <className>org/forgerock/opendj/ldap/LDAPListener</className>
- <differenceType>7012</differenceType>
- <method>java.util.Set getSocketAddresses()</method>
- <justification>Accept multiple SocketAddress to bind to</justification>
- </difference>
- <difference>
+ <difference>
<className>org/forgerock/opendj/ldap/LDAPListener</className>
<differenceType>7002</differenceType>
<method>java.net.InetAddress getAddress()</method>
@@ -220,11 +208,23 @@
<justification>Accept multiple SocketAddress to bind to</justification>
</difference>
<difference>
+ <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
+ <differenceType>7002</differenceType>
+ <method>java.net.InetSocketAddress getSocketAddress()</method>
+ <justification>Accept multiple SocketAddress to bind to</justification>
+ </difference>
+ <difference>
+ <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
+ <differenceType>7012</differenceType>
+ <method>java.util.Set getSocketAddresses()</method>
+ <justification>Accept multiple SocketAddress to bind to</justification>
+ </difference>
+ <difference>
<className>org/forgerock/opendj/ldap/spi/TransportProvider</className>
<differenceType>7005</differenceType>
<method>org.forgerock.opendj.ldap.spi.LDAPListenerImpl getLDAPListener(java.net.InetSocketAddress, org.forgerock.opendj.ldap.ServerConnectionFactory, org.forgerock.util.Options)</method>
- <to>%regex[org\.forgerock\.opendj\.ldap\.spi\.LDAPListenerImpl\s*getLDAPListener\(java\.util\.Set,\s*org\.forgerock\.opendj\.ldap\.ServerConnectionFactory,\s*org\.forgerock\.util\.Options\)]</to>
- <justification>Accept multiple SocketAddress to bind to</justification>
+ <to>%regex[org\.forgerock\.opendj\.ldap\.spi\.LDAPListenerImpl +getLDAPListener\(java\.util\.Set, +org\.forgerock\.util\.Function, +org\.forgerock\.util\.Options\)]</to>
+ <justification>Accept multiple SocketAddress to bind to. Use Function rather than specific interface</justification>
</difference>
<difference>
<className>org/forgerock/opendj/io/LDAP</className>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Action.java b/opendj-core/src/main/java/com/forgerock/reactive/Action.java
index 72c541c..a7c8e96 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Action.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Action.java
@@ -23,7 +23,7 @@
* Runs the action and optionally throws a checked exception.
*
* @throws Exception
- * if the implementation wishes to throw a checked exception
+ * If an error occurred while performing the Action
*/
void run() throws Exception;
}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
index b8391d6..dabe352 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -18,16 +18,20 @@
import org.forgerock.util.Function;
import org.reactivestreams.Publisher;
-/** {@link Completable} is used to communicates a terminated operation which doesn't produce a result. */
+/**
+ * {@link Completable} is used to communicates a terminated operation which doesn't produce a result. It extends
+ * {@link Publisher} by providing additional reactive methods allowing to act on it. The goal of this interface is to
+ * decouple ourself from reactive framework (RxJava/Reactor).
+ */
public interface Completable extends Publisher<Void> {
- /** Emitter is used to notify when the operation has been completed, successfully or not. */
+ /** Subscriber is notified when the operation has been completed, successfully or not. */
public interface Subscriber {
- /** Notify that this {@link Completable} is now completed. */
+ /** Called once this {@link Completable} is completed. */
void onComplete();
/**
- * Notify that this {@link Completable} cannot be completed because of an error.
+ * Called when this {@link Completable} cannot be completed because of an error.
*
* @param error
* The error preventing this {@link Completable} to complete
@@ -40,10 +44,12 @@
/**
* Called when the streaming api has been subscribed.
*
- * @param e
+ * @param s
* The {@link Subscriber} to use to communicate the completeness of this {@link Completable}
+ * @throws Exception
+ * on error
*/
- void subscribe(Subscriber e);
+ void subscribe(Subscriber s) throws Exception;
}
/**
@@ -57,7 +63,7 @@
Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
/**
- * Creates a {@link Single} which will emit the specified value when this {@link Completable} complete.
+ * Returns a {@link Single} which will complete with the provided value once this {@link Completable} completes.
*
* @param <V>
* Type of the value to emit
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Consumer.java b/opendj-core/src/main/java/com/forgerock/reactive/Consumer.java
index e3c3949..026bca8 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Consumer.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Consumer.java
@@ -23,12 +23,12 @@
*/
public interface Consumer<V> {
/**
- * Consume the given value.
+ * Consumes a value.
*
- * @param item
+ * @param value
* the value
* @throws Exception
* on error
*/
- void accept(V item) throws Exception;
+ void accept(V value) throws Exception;
}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
index 689e54c..d98989d 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
@@ -16,26 +16,26 @@
package com.forgerock.reactive;
/**
- * Handle the processing of a request in a given context and return the resulting response.
+ * Handle the processing of an input in a given context and return the resulting output.
*
- * @param <CTX>
+ * @param <C>
* Type of the context
- * @param <REQ>
- * Type of the request to process
- * @param <REP>
- * Type of the response
+ * @param <I>
+ * Type of the input to process
+ * @param <O>
+ * Type of the output as a result of the process
*/
-public interface ReactiveHandler<CTX, REQ, REP> {
+public interface ReactiveHandler<C, I, O> {
/**
- * Process the request given the context.
+ * Process an input given the context.
*
* @param context
- * Context in which the request must be processed
- * @param request
- * The response to process
- * @return A {@link Single} response.
+ * Context in which the input must be processed
+ * @param input
+ * The input to process
+ * @return An output.
* @throws Exception
* if the request cannot be processed
*/
- REP handle(final CTX context, final REQ request) throws Exception;
+ O handle(final C context, final I input) throws Exception;
}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index 35ed67c..de0d8bf 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -17,6 +17,7 @@
import org.forgerock.util.Function;
import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscription;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableOnSubscribe;
@@ -86,7 +87,8 @@
}
/**
- * Create a new {@link Single} from the given {@link Publisher}.
+ * Create a new {@link Single} from the given {@link Publisher}. If the {@link Publisher} produce more than one
+ * result, they'll be dropped and the inner {@link Subscription} cancelled.
*
* @param <V>
* Type of the datum emitted
@@ -223,7 +225,7 @@
}
@Override
- public Stream<V> onNextDo(final Consumer<V> onNext) {
+ public Stream<V> onNext(final Consumer<V> onNext) {
return new RxJavaStream<>(impl.doOnNext(new io.reactivex.functions.Consumer<V>() {
@Override
public void accept(V value) throws Exception {
@@ -233,7 +235,7 @@
}
@Override
- public Stream<V> onErrorDo(final Consumer<Throwable> onError) {
+ public Stream<V> onError(final Consumer<Throwable> onError) {
return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
@Override
public void accept(Throwable t) throws Exception {
@@ -254,7 +256,7 @@
}
@Override
- public Stream<V> onCompleteDo(final Action action) {
+ public Stream<V> onComplete(final Action action) {
return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() {
@Override
public void run() throws Exception {
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
new file mode 100644
index 0000000..dad0634
--- /dev/null
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
@@ -0,0 +1,317 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package com.forgerock.reactive;
+
+import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
+import static org.forgerock.util.Reject.checkNotNull;
+
+import java.io.IOException;
+
+import org.forgerock.opendj.io.ASN1Reader;
+import org.forgerock.opendj.io.AbstractLDAPMessageHandler;
+import org.forgerock.opendj.io.LDAP;
+import org.forgerock.opendj.io.LDAPReader;
+import org.forgerock.opendj.ldap.DecodeException;
+import org.forgerock.opendj.ldap.DecodeOptions;
+import org.forgerock.opendj.ldap.IntermediateResponseHandler;
+import org.forgerock.opendj.ldap.LDAPClientContext;
+import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
+import org.forgerock.opendj.ldap.LdapException;
+import org.forgerock.opendj.ldap.LdapResultHandler;
+import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.SearchResultHandler;
+import org.forgerock.opendj.ldap.ServerConnection;
+import org.forgerock.opendj.ldap.ServerConnectionFactory;
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.GenericBindRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.forgerock.opendj.ldap.responses.IntermediateResponse;
+import org.forgerock.opendj.ldap.responses.Response;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.opendj.ldap.responses.SearchResultEntry;
+import org.forgerock.opendj.ldap.responses.SearchResultReference;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.opendj.ldap.spi.TransportProvider;
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.RuntimeExceptionHandler;
+
+import io.reactivex.BackpressureStrategy;
+import io.reactivex.Flowable;
+import io.reactivex.FlowableEmitter;
+import io.reactivex.FlowableOnSubscribe;
+
+/**
+ * Adapt a {@link ServerConnectionFactory} to a {@link Function} compatible with
+ * {@link TransportProvider#getLDAPListener(java.util.Set, Function, org.forgerock.util.Options)}.
+ */
+@Deprecated
+public final class ServerConnectionFactoryAdapter implements
+ Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> {
+
+ private final ServerConnectionFactory<LDAPClientContext, Integer> adaptee;
+ private final DecodeOptions decodeOptions;
+
+ /**
+ * Creates a new adapter.
+ *
+ * @param decodeOptions
+ * {@link DecodeOptions} used during request deserialization
+ * @param serverConnectionFactory
+ * the {@link ServerConnectionFactory} to adapt
+ */
+ public ServerConnectionFactoryAdapter(final DecodeOptions decodeOptions,
+ final ServerConnectionFactory<LDAPClientContext, Integer> serverConnectionFactory) {
+ this.decodeOptions = checkNotNull(decodeOptions, "decodeOptions must not be null");
+ this.adaptee = checkNotNull(serverConnectionFactory, "serverConnectionFactory must not be null");
+ }
+
+ @Override
+ public ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> apply(
+ final LDAPClientContext clientContext) throws LdapException {
+ return new ServerConnectionAdapter(clientContext, decodeOptions, adaptee.handleAccept(clientContext));
+ }
+
+ /**
+ * Adapt a {@link ServerConnection} to a {@link Function} compatible with
+ * {@link TransportProvider#getLDAPListener(java.util.Set, Function, org.forgerock.util.Options)}.
+ */
+ public static final class ServerConnectionAdapter
+ implements ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> {
+
+ private final ServerConnection<Integer> adaptee;
+ private final DecodeOptions decodeOptions;
+
+ /**
+ * Creates a new adapter.
+ *
+ * @param clientContext
+ * The {@link LDAPClientContext} which represents the client connection
+ * @param decodeOptions
+ * The {@link DecodeOptions} to use during request deserialization
+ * @param serverConnection
+ * the {@link ServerConnection} to adapt
+ */
+ public ServerConnectionAdapter(final LDAPClientContext clientContext, final DecodeOptions decodeOptions,
+ final ServerConnection<Integer> serverConnection) {
+ this.decodeOptions = checkNotNull(decodeOptions, "decodeOptions must not be null");
+ this.adaptee = checkNotNull(serverConnection, "serverConnection must not be null");
+ clientContext.addConnectionEventListener(new ConnectionEventListener() {
+ @Override
+ public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
+ adaptee.handleConnectionError(error);
+ }
+
+ @Override
+ public void handleConnectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest) {
+ if (unbindRequest == null) {
+ adaptee.handleConnectionClosed(null, null);
+ } else {
+ adaptee.handleConnectionClosed(0, unbindRequest);
+ }
+ }
+
+ @Override
+ public void handleConnectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
+ final String diagnosticMessage) {
+ adaptee.handleConnectionDisconnected(resultCode, diagnosticMessage);
+ }
+ });
+ }
+
+ @Override
+ public Stream<Response> handle(final LDAPClientContext context, final LdapRequestEnvelope request)
+ throws Exception {
+ final LDAPReader<ASN1Reader> reader = LDAP.getReader(request.getContent(), decodeOptions);
+ return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
+ @Override
+ public void subscribe(final FlowableEmitter<Response> emitter) throws Exception {
+ reader.readMessage(new AbstractLDAPMessageHandler() {
+ @Override
+ public void abandonRequest(int messageID, AbandonRequest request)
+ throws DecodeException, IOException {
+ handleAbandon(messageID, request, emitter);
+ }
+
+ @Override
+ public void addRequest(int messageID, AddRequest request) throws DecodeException, IOException {
+ handleAdd(messageID, request, emitter);
+ }
+
+ @Override
+ public void deleteRequest(final int messageID, final DeleteRequest request)
+ throws DecodeException, IOException {
+ handleDelete(messageID, request, emitter);
+ }
+
+ @Override
+ public void bindRequest(int messageID, int version, GenericBindRequest request)
+ throws DecodeException, IOException {
+ handleBind(messageID, version, request, emitter);
+ }
+
+ @Override
+ public void compareRequest(int messageID, CompareRequest request)
+ throws DecodeException, IOException {
+ handleCompare(messageID, request, emitter);
+ }
+
+ @Override
+ public <R extends ExtendedResult> void extendedRequest(int messageID,
+ ExtendedRequest<R> request) throws DecodeException, IOException {
+ handleExtendedRequest(messageID, request, emitter);
+ }
+
+ @Override
+ public void modifyDNRequest(int messageID, ModifyDNRequest request)
+ throws DecodeException, IOException {
+ handleModifyDN(messageID, request, emitter);
+ }
+
+ @Override
+ public void modifyRequest(int messageID, ModifyRequest request)
+ throws DecodeException, IOException {
+ handleModify(messageID, request, emitter);
+ }
+
+ @Override
+ public void searchRequest(int messageID, SearchRequest request)
+ throws DecodeException, IOException {
+ handleSearch(messageID, request, emitter);
+ }
+
+ @Override
+ public void unbindRequest(int messageID, UnbindRequest request)
+ throws DecodeException, IOException {
+ // Unbind request are received through ConnectionEventListener only
+ }
+ });
+ emitter.onComplete();
+ }
+ }, BackpressureStrategy.ERROR));
+ }
+
+ void handleAdd(final int requestId, final AddRequest request, final FlowableEmitter<Response> responseEmitter) {
+ final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+ adaptee.handleAdd(requestId, request, resultAdapter, resultAdapter);
+ }
+
+ void handleBind(final int requestId, final int version, final BindRequest request,
+ final FlowableEmitter<Response> responseEmitter) {
+ final ResultHandlerAdapter<BindResult> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+ adaptee.handleBind(requestId, version, request, resultAdapter, resultAdapter);
+ }
+
+ void handleCompare(final int requestId, final CompareRequest request,
+ final FlowableEmitter<Response> responseEmitter) {
+ final ResultHandlerAdapter<CompareResult> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+ adaptee.handleCompare(requestId, request, resultAdapter, resultAdapter);
+ }
+
+ void handleDelete(final int requestId, final DeleteRequest request,
+ final FlowableEmitter<Response> responseEmitter) {
+ final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+ adaptee.handleDelete(requestId, request, resultAdapter, resultAdapter);
+ }
+
+ <R extends ExtendedResult> void handleExtendedRequest(final int requestId, final ExtendedRequest<R> request,
+ final FlowableEmitter<Response> responseEmitter) {
+ final ResultHandlerAdapter<R> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+ adaptee.handleExtendedRequest(requestId, request, resultAdapter, resultAdapter);
+ }
+
+ void handleModify(final int requestId, final ModifyRequest request,
+ final FlowableEmitter<Response> responseEmitter) {
+ final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+ adaptee.handleModify(requestId, request, resultAdapter, resultAdapter);
+ }
+
+ void handleModifyDN(final int requestId, final ModifyDNRequest request,
+ final FlowableEmitter<Response> responseEmitter) {
+ final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+ adaptee.handleModifyDN(requestId, request, resultAdapter, resultAdapter);
+ }
+
+ void handleSearch(final int requestId, final SearchRequest request,
+ final FlowableEmitter<Response> responseEmitter) {
+ final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+ adaptee.handleSearch(requestId, request, resultAdapter, resultAdapter, resultAdapter);
+ }
+
+ void handleAbandon(int requestId, final AbandonRequest request, final FlowableEmitter<Response> emitter) {
+ adaptee.handleAbandon(requestId, request);
+ }
+
+ /** Forward all responses received from handler to a {@link LdapResponse}. */
+ private static final class ResultHandlerAdapter<R extends Response> implements IntermediateResponseHandler,
+ SearchResultHandler, LdapResultHandler<R>, RuntimeExceptionHandler {
+
+ private final FlowableEmitter<Response> adaptee;
+
+ ResultHandlerAdapter(final FlowableEmitter<Response> emitter) {
+ this.adaptee = emitter;
+ }
+
+ @Override
+ public boolean handleEntry(final SearchResultEntry entry) {
+ adaptee.onNext(entry);
+ return true;
+ }
+
+ @Override
+ public boolean handleReference(final SearchResultReference reference) {
+ adaptee.onNext(reference);
+ return true;
+ }
+
+ @Override
+ public boolean handleIntermediateResponse(final IntermediateResponse intermediateResponse) {
+ adaptee.onNext(intermediateResponse);
+ return true;
+ }
+
+ @Override
+ public void handleResult(R result) {
+ if (result != null) {
+ adaptee.onNext(result);
+ }
+ adaptee.onComplete();
+ }
+
+ @Override
+ public void handleRuntimeException(RuntimeException exception) {
+ adaptee.onError(exception);
+ }
+
+ @Override
+ public void handleException(LdapException exception) {
+ adaptee.onError(exception);
+ }
+ }
+ }
+}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Single.java b/opendj-core/src/main/java/com/forgerock/reactive/Single.java
index a1f534f..2d8c678 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Single.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Single.java
@@ -19,40 +19,45 @@
import org.reactivestreams.Publisher;
/**
- * Single is a reactive-streams compatible promise.
+ * Single is a reactive-streams compatible promise. It extends {@link Publisher} by providing additional reactive
+ * methods allowing to act on it. The goal of this interface is to decouple ourself from reactive framework
+ * (RxJava/Reactor).
*
* @param <V>
* Type of the datum emitted
*/
public interface Single<V> extends Publisher<V> {
- /** Emitter is used to notify when the operation has been completed, successfully or not. */
+ /** Subscriber is notified when the operation has been completed, successfully or not. */
public interface Subscriber<V> {
/**
- * Signal a success value.
+ * Called once this {@link Single} is completed.
*
- * @param t
+ * @param value
* the value, not null
*/
- void onComplete(V t);
+ void onComplete(V value);
/**
- * Signal an exception.
+ * Called when this {@link Single} cannot be completed because of an error.
*
- * @param t
- * the exception, not null
+ * @param error
+ * The error preventing this {@link Single} to complete, not null
*/
- void onError(Throwable t);
+ void onError(Throwable error);
}
/** Adapts the streaming api to a callback one. */
public interface Emitter<V> {
/**
* Called for each SingleObserver that subscribes.
- * @param e the safe emitter instance, never null
- * @throws Exception on error
+ *
+ * @param s
+ * The {@link Subscriber} to use to communicate the completeness of this {@link Single}
+ * @throws Exception
+ * on error
*/
- void subscribe(Subscriber<V> e) throws Exception;
+ void subscribe(Subscriber<V> s) throws Exception;
}
/**
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
index 7138548..106f26f 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -19,7 +19,9 @@
import org.reactivestreams.Publisher;
/**
- * Stream is a reactive-streams compliant way to chain operations and transformation on a stream of data.
+ * Stream is a reactive-streams compliant way to chain operations and transformation on a stream of data. It extends
+ * {@link Publisher} by providing additional reactive methods allowing to act on it. The goal of this interface is to
+ * decouple ourself from reactive framework (RxJava/Reactor).
*
* @param <V>
* Type of data emitted
@@ -27,7 +29,7 @@
public interface Stream<V> extends Publisher<V> {
/**
- * Transform the data emitted by this stream.
+ * Transforms the data emitted by this stream.
*
* @param <O>
* Type of data emitted after transformation
@@ -38,7 +40,8 @@
<O> Stream<O> map(Function<V, O, Exception> function);
/**
- * Transform each data emitted by this stream into a new stream of data. All these streams are then merged together.
+ * Transforms each data emitted by this stream into a new stream of data. All these streams are then merged
+ * together.
*
* @param <O>
* Type of data emitted after transformation
@@ -69,7 +72,7 @@
* The {@link Consumer} to invoke when a value is emitted by this stream
* @return a new {@link Stream}
*/
- Stream<V> onNextDo(Consumer<V> onNext);
+ Stream<V> onNext(Consumer<V> onNext);
/**
* Invokes the on error {@link Consumer} when an error occurs on this stream.
@@ -78,7 +81,7 @@
* The {@link Consumer} to invoke on error
* @return a new {@link Stream}
*/
- Stream<V> onErrorDo(Consumer<Throwable> onError);
+ Stream<V> onError(Consumer<Throwable> onError);
/**
* Invokes the on complete {@link Action} when this stream is completed.
@@ -87,10 +90,10 @@
* The {@link Action} to invoke on stream completion
* @return a new {@link Stream}
*/
- Stream<V> onCompleteDo(Action onComplete);
+ Stream<V> onComplete(Action onComplete);
/**
- * Subscribe to this stream and drop all data produced by it.
+ * Subscribes to this stream and drop all data produced by it.
*/
void subscribe();
}
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java b/opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java
index 1720efc..31c9248 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java
@@ -17,6 +17,7 @@
package org.forgerock.opendj.io;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -153,6 +154,19 @@
* The protocol op type for unbind requests.
*/
public static final byte OP_TYPE_UNBIND_REQUEST = 0x42;
+ /** Mapping between request protocol op and their respecetive response protocol op. */
+ public static final byte[] OP_TO_RESULT_TYPE = new byte[0xFF];
+ static {
+ Arrays.fill(OP_TO_RESULT_TYPE, (byte) 0x00);
+ OP_TO_RESULT_TYPE[OP_TYPE_ADD_REQUEST] = OP_TYPE_ADD_RESPONSE;
+ OP_TO_RESULT_TYPE[OP_TYPE_BIND_REQUEST] = OP_TYPE_BIND_RESPONSE;
+ OP_TO_RESULT_TYPE[OP_TYPE_COMPARE_REQUEST] = OP_TYPE_COMPARE_RESPONSE;
+ OP_TO_RESULT_TYPE[OP_TYPE_DELETE_REQUEST] = OP_TYPE_DELETE_RESPONSE;
+ OP_TO_RESULT_TYPE[OP_TYPE_EXTENDED_REQUEST] = OP_TYPE_EXTENDED_RESPONSE;
+ OP_TO_RESULT_TYPE[OP_TYPE_MODIFY_DN_REQUEST] = OP_TYPE_MODIFY_DN_RESPONSE;
+ OP_TO_RESULT_TYPE[OP_TYPE_MODIFY_REQUEST] = OP_TYPE_MODIFY_RESPONSE;
+ OP_TO_RESULT_TYPE[OP_TYPE_SEARCH_REQUEST] = OP_TYPE_SEARCH_RESULT_DONE;
+ };
/**
* The BER type to use for the AuthenticationChoice element in a bind
* request when SASL authentication is to be used.
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java
index 1ea5b1d..2354990 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java
@@ -24,7 +24,7 @@
/**
* Common options for LDAP clients and listeners.
*/
-public abstract class CommonLDAPOptions {
+abstract class CommonLDAPOptions {
/**
* Specifies the class loader which will be used to load the
* {@link org.forgerock.opendj.ldap.spi.TransportProvider TransportProvider}.
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
index bd212cb..83d4cf5 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -18,6 +18,7 @@
package org.forgerock.opendj.ldap;
import java.net.InetSocketAddress;
+import java.util.EventListener;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
@@ -35,7 +36,7 @@
public interface LDAPClientContext {
/** Listens for disconnection event. */
- public interface DisconnectListener {
+ public interface ConnectionEventListener extends EventListener {
/**
* Invoked when the connection has been disconnected because of an error (e.g: message too big).
*
@@ -44,7 +45,7 @@
* @param error
* The error
*/
- void exceptionOccurred(LDAPClientContext context, Throwable error);
+ void handleConnectionError(LDAPClientContext context, Throwable error);
/**
* Invoked when the client closed the connection, possibly using an unbind request.
@@ -55,7 +56,7 @@
* The unbind request, which may be {@code null} if one was not sent before the connection was
* closed.
*/
- void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
+ void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
/**
* Invoked when the connection has been disconnected by the server.
@@ -68,28 +69,28 @@
* @param diagnosticMessage
* The diagnostic message, which may be empty or {@code null} indicating that none was provided.
*/
- void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
+ void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
}
/**
* Register a listener which will be notified when this {@link LDAPClientContext} is disconnected.
*
- * @param listener The {@link DisconnectListener} to register.
+ * @param listener The {@link ConnectionEventListener} to register.
*/
- void onDisconnect(DisconnectListener listener);
+ void addConnectionEventListener(ConnectionEventListener listener);
/**
* Disconnects the client without sending a disconnect notification. Invoking this method causes
- * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
- * method returns.
+ * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
+ * before this method returns.
*/
void disconnect();
/**
* Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic
* message. Invoking this method causes
- * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
- * method returns.
+ * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
+ * before this method returns.
*
* @param resultCode
* The result code to include with the disconnect notification
@@ -136,6 +137,15 @@
SSLSession getSSLSession();
/**
+ * Returns the {@link SaslServer} currently in use by the underlying connection, or
+ * {@code null} if SASL integrity and/or privacy protection is not enabled.
+ *
+ * @return The {@link SaslServer} currently in use by the underlying connection, or
+ * {@code null} if SASL integrity and/or privacy protection is not enabled.
+ */
+ SaslServer getSASLServer();
+
+ /**
* Returns {@code true} if the underlying connection has been closed as a
* result of a client disconnect, a fatal connection error, or a server-side
* {@link #disconnect}.
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
index 2bb4497..dac0421 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
@@ -17,8 +17,8 @@
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT;
-import static com.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_UNEXPECTED;
import static com.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT;
+import static com.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_UNEXPECTED;
import static com.forgerock.opendj.ldap.CoreMessages.LDAP_CONNECTION_CONNECT_TIMEOUT;
import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
import static java.util.concurrent.TimeUnit.*;
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java
index 6b9bee7..647d445 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java
@@ -22,12 +22,18 @@
import java.util.Collections;
import java.util.Set;
+import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LDAPListenerImpl;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.ldap.spi.TransportProvider;
+import org.forgerock.util.Function;
import org.forgerock.util.Option;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
+import com.forgerock.reactive.ReactiveHandler;
+import com.forgerock.reactive.Stream;
+
/**
* An LDAP server connection listener which waits for LDAP connection requests
* to come in over the network and binds them to a {@link ServerConnection}
@@ -119,8 +125,7 @@
* @param port
* The port to listen on.
* @param factory
- * The server connection factory which will be used to create
- * server connections.
+ * The handler factory which will be used to create handlers.
* @throws IOException
* If an error occurred while trying to listen on the provided
* address.
@@ -128,7 +133,9 @@
* If {code factory} was {@code null}.
*/
public LDAPListener(final int port,
- final ServerConnectionFactory<LDAPClientContext, Integer> factory) throws IOException {
+ final Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> factory) throws IOException {
this(port, factory, Options.defaultOptions());
}
@@ -139,8 +146,7 @@
* @param port
* The port to listen on.
* @param factory
- * The server connection factory which will be used to create
- * server connections.
+ * The handler factory which will be used to create handlers.
* @param options
* The LDAP listener options.
* @throws IOException
@@ -150,40 +156,42 @@
* If {code factory} or {@code options} was {@code null}.
*/
public LDAPListener(final int port,
- final ServerConnectionFactory<LDAPClientContext, Integer> factory,
+ final Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> factory,
final Options options) throws IOException {
- this(new InetSocketAddress(port), factory, options);
+ this(Collections.singleton(new InetSocketAddress(port)), factory, options);
}
/**
* Creates a new LDAP listener implementation which will listen for LDAP
* client connections at the provided address.
*
- * @param address
- * The address to listen on.
+ * @param addresses
+ * The addresses to listen on.
* @param factory
- * The server connection factory which will be used to create
- * server connections.
+ * The handler factory which will be used to create handlers.
* @throws IOException
* If an error occurred while trying to listen on the provided
* address.
* @throws NullPointerException
* If {@code address} or {code factory} was {@code null}.
*/
- public LDAPListener(final InetSocketAddress address,
- final ServerConnectionFactory<LDAPClientContext, Integer> factory) throws IOException {
- this(address, factory, Options.defaultOptions());
+ public LDAPListener(final Set<InetSocketAddress> addresses,
+ final Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> factory) throws IOException {
+ this(addresses, factory, Options.defaultOptions());
}
/**
* Creates a new LDAP listener implementation which will listen for LDAP
* client connections at the provided address.
*
- * @param address
- * The address to listen on.
+ * @param addresses
+ * The addresses to listen on.
* @param factory
- * The server connection factory which will be used to create
- * server connections.
+ * The handler factory which will be used to create handlers.
* @param options
* The LDAP listener options.
* @throws IOException
@@ -193,12 +201,14 @@
* If {@code address}, {code factory}, or {@code options} was
* {@code null}.
*/
- public LDAPListener(final InetSocketAddress address,
- final ServerConnectionFactory<LDAPClientContext, Integer> factory,
+ public LDAPListener(final Set<InetSocketAddress> addresses,
+ final Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> factory,
final Options options) throws IOException {
- Reject.ifNull(address, factory, options);
+ Reject.ifNull(addresses, factory, options);
this.provider = getTransportProvider(options);
- this.impl = provider.getLDAPListener(Collections.singleton(address), factory, options);
+ this.impl = provider.getLDAPListener(addresses, factory, options);
}
/**
@@ -210,8 +220,7 @@
* @param port
* The port to listen on.
* @param factory
- * The server connection factory which will be used to create
- * server connections.
+ * The handler factory which will be used to create handlers.
* @throws IOException
* If an error occurred while trying to listen on the provided
* address.
@@ -219,7 +228,9 @@
* If {@code host} or {code factory} was {@code null}.
*/
public LDAPListener(final String host, final int port,
- final ServerConnectionFactory<LDAPClientContext, Integer> factory) throws IOException {
+ final Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> factory) throws IOException {
this(host, port, factory, Options.defaultOptions());
}
@@ -232,8 +243,7 @@
* @param port
* The port to listen on.
* @param factory
- * The server connection factory which will be used to create
- * server connections.
+ * The handler factory which will be used to create handlers.
* @param options
* The LDAP listener options.
* @throws IOException
@@ -244,9 +254,11 @@
* {@code null}.
*/
public LDAPListener(final String host, final int port,
- final ServerConnectionFactory<LDAPClientContext, Integer> factory,
+ final Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> factory,
final Options options) throws IOException {
- this(new InetSocketAddress(host, port), factory, options);
+ this(Collections.singleton(new InetSocketAddress(host, port)), factory, options);
}
/** Closes this LDAP connection listener. */
@@ -256,15 +268,24 @@
}
/**
- * Returns the address that this LDAP listener is listening on.
+ * Returns the addresses that this LDAP listener is listening on.
*
- * @return The address that this LDAP listener is listening on.
+ * @return The addresses that this LDAP listener is listening on.
*/
public Set<InetSocketAddress> getSocketAddresses() {
return impl.getSocketAddresses();
}
/**
+ * Returns the first address that his LDAP listener is listening on.
+ *
+ * @return The addresses that this LDAP listener is listening on.
+ */
+ public InetSocketAddress firstSocketAddress() {
+ return impl.getSocketAddresses().iterator().next();
+ }
+
+ /**
* Returns the name of the transport provider, which provides the implementation
* of this factory.
*
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java
index a8eab85..09690dc 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java
@@ -12,7 +12,7 @@
* information: "Portions Copyright [year] [name of copyright owner]".
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2012 ForgeRock AS.
+ * Portions copyright 2012-2016 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -35,6 +35,7 @@
* The type of request context.
* @see ServerConnectionFactory
*/
+@Deprecated
public interface ServerConnection<C> extends RequestHandler<C> {
/**
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java
index 996b216..0037bb9 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java
@@ -12,7 +12,7 @@
* information: "Portions Copyright [year] [name of copyright owner]".
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions Copyright 2014 ForgeRock AS.
+ * Portions Copyright 2014-2016 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -33,6 +33,7 @@
* @see Connections#newInternalConnectionFactory(ServerConnectionFactory,
* Object) newInternalConnectionFactory
*/
+@Deprecated
public interface ServerConnectionFactory<C, R> {
/**
* Invoked when a new client connection is accepted by the associated
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java
index a2efe6a..b4603a2 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java
@@ -21,7 +21,7 @@
import org.forgerock.opendj.ldap.responses.Response;
/**
- * Contains statics methods to create ldap messages.
+ * Contains static methods to create ldap messages.
*/
public final class LdapMessages {
@@ -30,23 +30,23 @@
}
/**
- * Creates a new {@link LdapRawMessage} containing a partially decoded LDAP message.
+ * Creates a new {@link } containing a partially decoded LDAP message.
*
* @param messageType
* Operation code of the message
* @param messageId
* Unique identifier of this message
- * @param protocolVersion
+ * @param ldapVersion
* Protocol version to use (only for Bind requests)
* @param rawDn
* Unparsed name contained in the request (or null if DN is not applicable)
* @param reader
* An {@link ASN1Reader} containing the full encoded ldap message packet.
- * @return A new {@link LdapRawMessage}
+ * @return A new {@link LdapRequestEnvelope}
*/
- public static LdapRawMessage newRawMessage(final byte messageType, final int messageId, final int protocolVersion,
- final ByteString rawDn, final ASN1Reader reader) {
- return new LdapRawMessage(messageType, messageId, protocolVersion, rawDn, reader);
+ public static LdapRequestEnvelope newRequestEnvelope(final byte messageType, final int messageId,
+ final int ldapVersion, final ByteString rawDn, final ASN1Reader reader) {
+ return new LdapRequestEnvelope(messageType, messageId, ldapVersion, rawDn, reader);
}
/**
@@ -67,16 +67,16 @@
}
/**
- * Represents an encoded LDAP message with it's envelope.
+ * Represents a Ldap Request envelope containing an encoded Request.
*/
- public static final class LdapRawMessage extends LdapMessageEnvelope<ASN1Reader> {
+ public static final class LdapRequestEnvelope extends LdapMessageEnvelope<ASN1Reader> {
private final ByteString rawDn;
- private final int version;
+ private final int ldapVersion;
- private LdapRawMessage(final byte messageType, final int messageId, final int version, final ByteString rawDn,
- final ASN1Reader content) {
+ private LdapRequestEnvelope(final byte messageType, final int messageId, final int ldapVersion,
+ final ByteString rawDn, final ASN1Reader content) {
super(messageType, messageId, content);
- this.version = version;
+ this.ldapVersion = ldapVersion;
this.rawDn = rawDn;
}
@@ -85,8 +85,8 @@
*
* @return The ldap protocol version
*/
- public int getVersion() {
- return version;
+ public int getLdapVersion() {
+ return ldapVersion;
}
/**
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java
index 37453be..9803230 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java
@@ -20,9 +20,15 @@
import java.util.Set;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.ServerConnectionFactory;
+import org.forgerock.opendj.ldap.LdapException;
+import org.forgerock.opendj.ldap.responses.Response;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.util.Function;
import org.forgerock.util.Options;
+import com.forgerock.reactive.ReactiveHandler;
+import com.forgerock.reactive.Stream;
+
/**
* Interface for transport providers, which provide implementation
* for {@code LDAPConnectionFactory} and {@code LDAPListener} classes,
@@ -55,8 +61,7 @@
* @param addresses
* The addresses to listen on.
* @param factory
- * The server connection factory which will be used to create
- * server connections.
+ * The handler factory which will be used to create handlers.
* @param options
* The LDAP listener options.
* @return an implementation of {@code LDAPListener}
@@ -65,6 +70,8 @@
* address.
*/
LDAPListenerImpl getLDAPListener(Set<InetSocketAddress> addresses,
- ServerConnectionFactory<LDAPClientContext, Integer> factory, Options options)
- throws IOException;
+ Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> factory,
+ Options options) throws IOException;
}
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java
index 86f16c1..7c8a436 100644
--- a/opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java
@@ -17,12 +17,13 @@
package org.forgerock.opendj.ldap;
-import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.LdapException.newLdapException;
import static org.forgerock.opendj.ldap.TestCaseUtils.loopbackWithDynamicPort;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -71,6 +72,7 @@
import com.forgerock.opendj.ldap.controls.AccountUsabilityRequestControl;
import com.forgerock.opendj.ldap.controls.AccountUsabilityResponseControl;
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/**
* A simple ldap server that manages 1000 entries and used for running
@@ -517,8 +519,10 @@
return;
}
sslContext = new SSLContextBuilder().getSSLContext();
- listener = new LDAPListener(loopbackWithDynamicPort(), getInstance(),
- Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096));
+ listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ getInstance()),
+ Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096));
isRunning = true;
}
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java
index bc01991..d98bdd9 100644
--- a/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java
@@ -21,14 +21,22 @@
import java.util.Set;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.ServerConnectionFactory;
+import org.forgerock.opendj.ldap.LdapException;
+import org.forgerock.opendj.ldap.responses.Response;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.util.Function;
import org.forgerock.util.Options;
+import com.forgerock.reactive.ReactiveHandler;
+import com.forgerock.reactive.Stream;
+
/**
* Basic LDAP listener implementation to use for tests only.
*/
public final class BasicLDAPListener implements LDAPListenerImpl {
- private final ServerConnectionFactory<LDAPClientContext, Integer> connectionFactory;
+ private final Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> connectionFactory;
private final Set<InetSocketAddress> socketAddresses;
/**
@@ -37,15 +45,16 @@
* @param address
* The address to listen on.
* @param factory
- * The server connection factory can be used to create
- * server connections.
+ * The server connection factory can be used to create server connections.
* @param options
* The LDAP listener options.
* @throws IOException
* is never thrown with this do-nothing implementation
*/
public BasicLDAPListener(final Set<InetSocketAddress> addresses,
- final ServerConnectionFactory<LDAPClientContext, Integer> factory,
+ final Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> factory,
final Options options) throws IOException {
this.connectionFactory = factory;
this.socketAddresses = addresses;
@@ -70,7 +79,9 @@
return builder.toString();
}
- ServerConnectionFactory<LDAPClientContext, Integer> getConnectionFactory() {
+ Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> getConnectionFactory() {
return connectionFactory;
}
}
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java
index f397c80..0410ab5 100644
--- a/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java
@@ -20,9 +20,15 @@
import java.util.Set;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.ServerConnectionFactory;
+import org.forgerock.opendj.ldap.LdapException;
+import org.forgerock.opendj.ldap.responses.Response;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.util.Function;
import org.forgerock.util.Options;
+import com.forgerock.reactive.ReactiveHandler;
+import com.forgerock.reactive.Stream;
+
/**
* Provides an basic implementation of a transport provider doing nothing.
* This should be used for tests only.
@@ -45,7 +51,10 @@
@Override
public LDAPListenerImpl getLDAPListener(Set<InetSocketAddress> addresses,
- ServerConnectionFactory<LDAPClientContext, Integer> factory, Options options) throws IOException {
+ Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> factory,
+ Options options) throws IOException {
return new BasicLDAPListener(addresses, factory, options);
}
diff --git a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
index 126f97b..6aa495a 100644
--- a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
+++ b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -15,42 +15,18 @@
*/
package com.forgerock.opendj.grizzly;
-import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Set;
import org.forgerock.opendj.grizzly.GrizzlyLDAPConnectionFactory;
import org.forgerock.opendj.grizzly.GrizzlyLDAPListener;
-import org.forgerock.opendj.io.ASN1Reader;
-import org.forgerock.opendj.io.AbstractLDAPMessageHandler;
-import org.forgerock.opendj.io.LDAP;
-import org.forgerock.opendj.io.LDAPReader;
-import org.forgerock.opendj.ldap.CommonLDAPOptions;
-import org.forgerock.opendj.ldap.DecodeException;
-import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
import org.forgerock.opendj.ldap.LdapException;
-import org.forgerock.opendj.ldap.ResultCode;
-import org.forgerock.opendj.ldap.ServerConnection;
-import org.forgerock.opendj.ldap.ServerConnectionFactory;
-import org.forgerock.opendj.ldap.requests.AbandonRequest;
-import org.forgerock.opendj.ldap.requests.AddRequest;
-import org.forgerock.opendj.ldap.requests.CompareRequest;
-import org.forgerock.opendj.ldap.requests.DeleteRequest;
-import org.forgerock.opendj.ldap.requests.ExtendedRequest;
-import org.forgerock.opendj.ldap.requests.GenericBindRequest;
-import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
-import org.forgerock.opendj.ldap.requests.ModifyRequest;
-import org.forgerock.opendj.ldap.requests.SearchRequest;
-import org.forgerock.opendj.ldap.requests.UnbindRequest;
-import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LDAPConnectionFactoryImpl;
import org.forgerock.opendj.ldap.spi.LDAPListenerImpl;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.ldap.spi.TransportProvider;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
@@ -58,11 +34,6 @@
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
-import io.reactivex.BackpressureStrategy;
-import io.reactivex.Flowable;
-import io.reactivex.FlowableEmitter;
-import io.reactivex.FlowableOnSubscribe;
-
/**
* Grizzly transport provider implementation.
*/
@@ -75,122 +46,15 @@
@Override
public LDAPListenerImpl getLDAPListener(final Set<InetSocketAddress> addresses,
- final ServerConnectionFactory<LDAPClientContext, Integer> factory, final Options options)
- throws IOException {
- return new GrizzlyLDAPListener(addresses, options,
- new Function<LDAPClientContext,
- ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
- LdapException>() {
- @Override
- public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> apply(
- final LDAPClientContext clientContext) throws LdapException {
- return newHandler(clientContext, factory, options);
- }
- });
+ final Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+ LdapException> factory,
+ final Options options) throws IOException {
+ return new GrizzlyLDAPListener(addresses, options, factory);
}
@Override
public String getName() {
return "Grizzly";
}
-
- private ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> newHandler(
- final LDAPClientContext clientContext, final ServerConnectionFactory<LDAPClientContext, Integer> factory,
- final Options options) throws LdapException {
- final ServerConnection<Integer> serverConnection = factory.handleAccept(clientContext);
- final ServerConnectionAdaptor<Integer> adapter = new ServerConnectionAdaptor<>(serverConnection);
- clientContext.onDisconnect(new DisconnectListener() {
- @Override
- public void exceptionOccurred(final LDAPClientContext context, final Throwable error) {
- serverConnection.handleConnectionError(error);
- }
-
- @Override
- public void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest) {
- serverConnection.handleConnectionClosed(0, unbindRequest);
- }
-
- @Override
- public void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
- final String diagnosticMessage) {
- serverConnection.handleConnectionDisconnected(resultCode, diagnosticMessage);
- }
- });
-
- final DecodeOptions decodeOptions = options.get(CommonLDAPOptions.LDAP_DECODE_OPTIONS);
- return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
- @Override
- public Stream<Response> handle(final LDAPClientContext context,
- final LdapRawMessage rawRequest) throws Exception {
- final LDAPReader<ASN1Reader> reader = LDAP.getReader(rawRequest.getContent(), decodeOptions);
- return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
- @Override
- public void subscribe(final FlowableEmitter<Response> emitter) throws Exception {
- reader.readMessage(new AbstractLDAPMessageHandler() {
- @Override
- public void abandonRequest(int messageID, AbandonRequest request)
- throws DecodeException, IOException {
- adapter.handleAbandon(messageID, request, emitter);
- }
-
- @Override
- public void addRequest(int messageID, AddRequest request)
- throws DecodeException, IOException {
- adapter.handleAdd(messageID, request, emitter);
- }
-
- @Override
- public void deleteRequest(final int messageID, final DeleteRequest request)
- throws DecodeException, IOException {
- adapter.handleDelete(messageID, request, emitter);
- }
-
- @Override
- public void bindRequest(int messageID, int version, GenericBindRequest request)
- throws DecodeException, IOException {
- adapter.handleBind(messageID, version, request, emitter);
- }
-
- @Override
- public void compareRequest(int messageID, CompareRequest request)
- throws DecodeException, IOException {
- adapter.handleCompare(messageID, request, emitter);
- }
-
- @Override
- public <R extends ExtendedResult> void extendedRequest(int messageID,
- ExtendedRequest<R> request) throws DecodeException, IOException {
- adapter.handleExtendedRequest(messageID, request, emitter);
- }
-
- @Override
- public void modifyDNRequest(int messageID, ModifyDNRequest request)
- throws DecodeException, IOException {
- adapter.handleModifyDN(messageID, request, emitter);
- }
-
- @Override
- public void modifyRequest(int messageID, ModifyRequest request)
- throws DecodeException, IOException {
- adapter.handleModify(messageID, request, emitter);
- }
-
- @Override
- public void searchRequest(int messageID, SearchRequest request)
- throws DecodeException, IOException {
- adapter.handleSearch(messageID, request, emitter);
- }
-
- @Override
- public void unbindRequest(int messageID, UnbindRequest request)
- throws DecodeException, IOException {
- serverConnection.handleConnectionClosed(messageID, request);
- }
- });
- emitter.onComplete();
- }
- }, BackpressureStrategy.ERROR));
- }
- };
- }
}
diff --git a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/ServerConnectionAdaptor.java b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/ServerConnectionAdaptor.java
deleted file mode 100644
index 9318052..0000000
--- a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/ServerConnectionAdaptor.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * The contents of this file are subject to the terms of the Common Development and
- * Distribution License (the License). You may not use this file except in compliance with the
- * License.
- *
- * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
- * specific language governing permission and limitations under the License.
- *
- * When distributing Covered Software, include this CDDL Header Notice in each file and include
- * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
- * Header, with the fields enclosed by brackets [] replaced by your own identifying
- * information: "Portions Copyright [year] [name of copyright owner]".
- *
- * Copyright 2016 ForgeRock AS.
- */
-package com.forgerock.opendj.grizzly;
-
-import static org.forgerock.util.Reject.checkNotNull;
-
-import org.forgerock.opendj.ldap.IntermediateResponseHandler;
-import org.forgerock.opendj.ldap.LdapException;
-import org.forgerock.opendj.ldap.LdapResultHandler;
-import org.forgerock.opendj.ldap.SearchResultHandler;
-import org.forgerock.opendj.ldap.ServerConnection;
-import org.forgerock.opendj.ldap.requests.AbandonRequest;
-import org.forgerock.opendj.ldap.requests.AddRequest;
-import org.forgerock.opendj.ldap.requests.BindRequest;
-import org.forgerock.opendj.ldap.requests.CompareRequest;
-import org.forgerock.opendj.ldap.requests.DeleteRequest;
-import org.forgerock.opendj.ldap.requests.ExtendedRequest;
-import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
-import org.forgerock.opendj.ldap.requests.ModifyRequest;
-import org.forgerock.opendj.ldap.requests.SearchRequest;
-import org.forgerock.opendj.ldap.responses.BindResult;
-import org.forgerock.opendj.ldap.responses.CompareResult;
-import org.forgerock.opendj.ldap.responses.ExtendedResult;
-import org.forgerock.opendj.ldap.responses.IntermediateResponse;
-import org.forgerock.opendj.ldap.responses.Response;
-import org.forgerock.opendj.ldap.responses.Result;
-import org.forgerock.opendj.ldap.responses.SearchResultEntry;
-import org.forgerock.opendj.ldap.responses.SearchResultReference;
-import org.forgerock.util.promise.RuntimeExceptionHandler;
-
-import io.reactivex.FlowableEmitter;
-
-final class ServerConnectionAdaptor<C> {
-
- private final ServerConnection<C> adaptee;
-
- public ServerConnectionAdaptor(final ServerConnection<C> handler) {
- this.adaptee = checkNotNull(handler, "handler must not be null");
- }
-
- public void handleAdd(final C requestContext, final AddRequest request,
- final FlowableEmitter<Response> response) {
- final ResultHandlerAdaptor<Result> resultAdapter = new ResultHandlerAdaptor<>(response);
- adaptee.handleAdd(requestContext, request, resultAdapter, resultAdapter);
- }
-
- public void handleBind(final C requestContext, final int version, final BindRequest request,
- final FlowableEmitter<Response> response) {
- final ResultHandlerAdaptor<BindResult> resultAdapter = new ResultHandlerAdaptor<>(response);
- adaptee.handleBind(requestContext, version, request, resultAdapter, resultAdapter);
- }
-
- public void handleCompare(final C requestContext, final CompareRequest request,
- final FlowableEmitter<Response> response) {
- final ResultHandlerAdaptor<CompareResult> resultAdapter = new ResultHandlerAdaptor<>(response);
- adaptee.handleCompare(requestContext, request, resultAdapter, resultAdapter);
- }
-
- public void handleDelete(final C requestContext, final DeleteRequest request,
- final FlowableEmitter<Response> response) {
- final ResultHandlerAdaptor<Result> resultAdapter = new ResultHandlerAdaptor<>(response);
- adaptee.handleDelete(requestContext, request, resultAdapter, resultAdapter);
- }
-
- public <R extends ExtendedResult> void handleExtendedRequest(final C requestContext,
- final ExtendedRequest<R> request, final FlowableEmitter<Response> response) {
- final ResultHandlerAdaptor<R> resultAdapter = new ResultHandlerAdaptor<>(response);
- adaptee.handleExtendedRequest(requestContext, request, resultAdapter, resultAdapter);
- }
-
- public void handleModify(final C requestContext, final ModifyRequest request,
- final FlowableEmitter<Response> response) {
- final ResultHandlerAdaptor<Result> resultAdapter = new ResultHandlerAdaptor<>(response);
- adaptee.handleModify(requestContext, request, resultAdapter, resultAdapter);
- }
-
- public void handleModifyDN(final C requestContext, final ModifyDNRequest request,
- final FlowableEmitter<Response> response) {
- final ResultHandlerAdaptor<Result> resultAdapter = new ResultHandlerAdaptor<>(response);
- adaptee.handleModifyDN(requestContext, request, resultAdapter, resultAdapter);
- }
-
- public void handleSearch(final C requestContext, final SearchRequest request,
- final FlowableEmitter<Response> response) {
- final ResultHandlerAdaptor<Result> resultAdapter = new ResultHandlerAdaptor<>(response);
- adaptee.handleSearch(requestContext, request, resultAdapter, resultAdapter, resultAdapter);
- }
-
- public void handleAbandon(C requestContext, final AbandonRequest request, final FlowableEmitter<Response> out) {
- adaptee.handleAbandon(requestContext, request);
- }
-
- /**
- * Forward all response received from handler to a {@link LdapResponse}.
- */
- private static final class ResultHandlerAdaptor<R extends Response>
- implements IntermediateResponseHandler, SearchResultHandler, LdapResultHandler<R>, RuntimeExceptionHandler {
-
- private final FlowableEmitter<Response> adaptee;
-
- ResultHandlerAdaptor(final FlowableEmitter<Response> emitter) {
- this.adaptee = emitter;
- }
-
- @Override
- public boolean handleEntry(final SearchResultEntry entry) {
- adaptee.onNext(entry);
- return true;
- }
-
- @Override
- public boolean handleReference(final SearchResultReference reference) {
- adaptee.onNext(reference);
- return true;
- }
-
- @Override
- public boolean handleIntermediateResponse(final IntermediateResponse intermediateResponse) {
- adaptee.onNext(intermediateResponse);
- return true;
- }
-
- @Override
- public void handleResult(R result) {
- if (result != null) {
- adaptee.onNext(result);
- }
- adaptee.onComplete();
- }
-
- @Override
- public void handleRuntimeException(RuntimeException exception) {
- adaptee.onError(exception);
- }
-
- @Override
- public void handleException(LdapException exception) {
- adaptee.onError(exception);
- }
- }
-}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
index fe1df91..2ebbf43 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
@@ -17,7 +17,6 @@
package org.forgerock.opendj.grizzly;
import static org.forgerock.opendj.grizzly.ServerTCPNIOTransport.SERVER_TRANSPORT;
-import static org.forgerock.opendj.ldap.CommonLDAPOptions.LDAP_DECODE_OPTIONS;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import java.io.IOException;
@@ -34,11 +33,10 @@
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LDAPListenerImpl;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.glassfish.grizzly.filterchain.FilterChain;
-import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.nio.transport.TCPNIOBindingHandler;
import org.glassfish.grizzly.nio.transport.TCPNIOConnection;
import org.glassfish.grizzly.nio.transport.TCPNIOServerConnection;
@@ -74,7 +72,7 @@
*/
public GrizzlyLDAPListener(final Set<InetSocketAddress> addresses, final Options options,
final Function<LDAPClientContext,
- ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
LdapException> requestHandlerFactory) throws IOException {
this(addresses, requestHandlerFactory, options, null);
}
@@ -97,7 +95,7 @@
*/
public GrizzlyLDAPListener(final Set<InetSocketAddress> addresses,
final Function<LDAPClientContext,
- ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
LdapException> requestHandlerFactory,
final Options options, TCPNIOTransport transport) throws IOException {
@@ -105,13 +103,7 @@
this.options = Options.copyOf(options);
final LDAPServerFilter serverFilter = new LDAPServerFilter(requestHandlerFactory, options,
options.get(LDAP_DECODE_OPTIONS), options.get(MAX_CONCURRENT_REQUESTS));
- final FilterChain ldapChain = GrizzlyUtils.buildFilterChain(this.transport.get().getProcessor(),
- new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)) {
- @Override
- protected void onLdapCodecError(FilterChainContext ctx, Throwable error) {
- serverFilter.exceptionOccurred(ctx, error);
- }
- }, serverFilter);
+ final FilterChain ldapChain = GrizzlyUtils.buildFilterChain(this.transport.get().getProcessor(), serverFilter);
final TCPNIOBindingHandler bindingHandler = TCPNIOBindingHandler.builder(this.transport.get())
.processor(ldapChain).build();
this.serverConnections = new ArrayList<>(addresses.size());
@@ -128,16 +120,14 @@
if (isClosed.compareAndSet(false, true)) {
try {
for (TCPNIOConnection serverConnection : serverConnections) {
- serverConnection.close().get();
+ serverConnection.closeSilently();
}
- } catch (final InterruptedException e) {
- // Cannot handle here.
- Thread.currentThread().interrupt();
} catch (final Exception e) {
// TODO: I18N
logger.warn(LocalizableMessage.raw("Exception occurred while closing listener", e));
+ } finally {
+ transport.release();
}
- transport.release();
}
}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
index dcae91c..b6f059d 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
@@ -17,7 +17,7 @@
package org.forgerock.opendj.grizzly;
-import static org.forgerock.opendj.ldap.CommonLDAPOptions.LDAP_DECODE_OPTIONS;
+import static org.forgerock.opendj.ldap.LDAPListener.LDAP_DECODE_OPTIONS;
import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_LOCAL_ERROR;
import static org.forgerock.opendj.ldap.responses.Responses.newResult;
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 189f31c..6e1b492 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -39,6 +39,7 @@
import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.LDAPClientContext;
+import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
@@ -49,18 +50,16 @@
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
-import org.forgerock.opendj.ldap.spi.LdapMessages;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
import org.glassfish.grizzly.Connection;
-import org.glassfish.grizzly.Grizzly;
-import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.Filter;
import org.glassfish.grizzly.filterchain.FilterChain;
+import org.glassfish.grizzly.filterchain.FilterChainBuilder;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.ssl.SSLFilter;
@@ -82,11 +81,8 @@
*/
final class LDAPServerFilter extends BaseFilter {
- private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
- .createAttribute("LDAPServerConnection");
-
private final Function<LDAPClientContext,
- ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
LdapException> connectionHandlerFactory;
/**
@@ -119,6 +115,8 @@
private final int maxConcurrentRequests;
private final Options connectionOptions;
+ private DecodeOptions decodeOptions;
+
/**
* Creates a server filter with provided listener, options and max size of ASN1 element.
*
@@ -131,11 +129,12 @@
*/
LDAPServerFilter(
final Function<LDAPClientContext,
- ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
LdapException> connectionHandlerFactory,
final Options connectionOptions, final DecodeOptions options, final int maxPendingRequests) {
this.connectionHandlerFactory = connectionHandlerFactory;
this.connectionOptions = connectionOptions;
+ this.decodeOptions = options;
this.maxConcurrentRequests = maxPendingRequests;
}
@@ -146,13 +145,23 @@
connection.configureBlocking(false);
final ClientConnectionImpl clientContext = new ClientConnectionImpl(connection);
- LDAP_CONNECTION_ATTR.set(connection, clientContext);
- final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler =
- connectionHandlerFactory.apply(clientContext);
+ connection.setProcessor(FilterChainBuilder
+ .stateless()
+ .addAll((FilterChain) connection.getProcessor())
+ .add(new LdapCodec(connectionOptions.get(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES), decodeOptions) {
+ @Override
+ protected void onLdapCodecError(final FilterChainContext ctx, final Throwable error) {
+ clientContext.exceptionOccurred(ctx, error);
+ }
+ })
+ .add(clientContext)
+ .build());
- clientContext.read().flatMap(new Function<LdapRawMessage, Publisher<Void>, Exception>() {
+ final ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> requestHandler =
+ connectionHandlerFactory.apply(clientContext);
+ clientContext.read().flatMap(new Function<LdapRequestEnvelope, Publisher<Void>, Exception>() {
@Override
- public Publisher<Void> apply(final LdapRawMessage rawRequest) throws Exception {
+ public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception {
if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
clientContext.notifyConnectionClosed(rawRequest);
return emptyStream();
@@ -185,7 +194,7 @@
// Swallow the error to prevent the subscribe() below to report it on the console.
return emptyStream();
}
- }).onCompleteDo(new Action() {
+ }).onComplete(new Action() {
@Override
public void run() throws Exception {
clientContext.notifyConnectionClosed(null);
@@ -194,30 +203,16 @@
return ctx.getStopAction();
}
- private final LdapResponseMessage toLdapResponseMessage(final LdapRawMessage rawRequest, final Result result) {
- switch (rawRequest.getMessageType()) {
- case OP_TYPE_ADD_REQUEST:
- return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), result);
- case OP_TYPE_BIND_REQUEST:
- return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), result);
- case OP_TYPE_COMPARE_REQUEST:
- return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), result);
- case OP_TYPE_DELETE_REQUEST:
- return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), result);
- case OP_TYPE_EXTENDED_REQUEST:
- return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), result);
- case OP_TYPE_MODIFY_DN_REQUEST:
- return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), result);
- case OP_TYPE_MODIFY_REQUEST:
- return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), result);
- case OP_TYPE_SEARCH_REQUEST:
- return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), result);
- default:
+ private final LdapResponseMessage toLdapResponseMessage(final LdapRequestEnvelope rawRequest, final Result result) {
+ final byte resultType = OP_TO_RESULT_TYPE[rawRequest.getMessageType()];
+ if (resultType == 0) {
throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType());
}
+ return newResponseMessage(resultType, rawRequest.getMessageId(), result);
}
- private Function<Response, LdapResponseMessage, Exception> toLdapResponseMessage(final LdapRawMessage rawRequest) {
+ private Function<Response, LdapResponseMessage, Exception> toLdapResponseMessage(
+ final LdapRequestEnvelope rawRequest) {
return new Function<Response, LdapResponseMessage, Exception>() {
@Override
public LdapResponseMessage apply(final Response response) {
@@ -233,58 +228,32 @@
if (response instanceof SearchResultReference) {
return newResponseMessage(OP_TYPE_SEARCH_RESULT_REFERENCE, rawRequest.getMessageId(), response);
}
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException(
+ "Not implemented for a response of type " + (response != null ? response.getClass() : null));
}
};
}
- @Override
- public NextAction handleRead(final FilterChainContext ctx) throws IOException {
- final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.get(ctx.getConnection());
- if (clientContext != null) {
- return clientContext.handleRead(ctx);
- }
- ctx.suspend();
- return ctx.getSuspendAction();
- }
-
- @Override
- public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
- final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
- if (clientContext != null) {
- clientContext.exceptionOccurred(ctx, error);
- }
- }
-
- @Override
- public NextAction handleClose(final FilterChainContext ctx) throws IOException {
- final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
- if (clientContext != null) {
- clientContext.handleClose(ctx);
- }
- return ctx.getStopAction();
- }
-
- final class ClientConnectionImpl implements LDAPClientContext {
+ final class ClientConnectionImpl extends BaseFilter implements LDAPClientContext {
final class GrizzlyBackpressureSubscription implements Subscription {
private final AtomicLong pendingRequests = new AtomicLong();
- private Subscriber<? super LdapRawMessage> subscriber;
+ private Subscriber<? super LdapRequestEnvelope> subscriber;
volatile FilterChainContext ctx;
- GrizzlyBackpressureSubscription(Subscriber<? super LdapRawMessage> subscriber) {
+ GrizzlyBackpressureSubscription(Subscriber<? super LdapRequestEnvelope> subscriber) {
this.subscriber = subscriber;
subscriber.onSubscribe(this);
}
NextAction handleRead(final FilterChainContext ctx) {
- final Subscriber<? super LdapRawMessage> sub = subscriber;
+ final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
if (sub == null) {
// Subscription cancelled. Stop reading
ctx.suspend();
return ctx.getSuspendAction();
}
- sub.onNext((LdapRawMessage) ctx.getMessage());
+ sub.onNext((LdapRequestEnvelope) ctx.getMessage());
if (BackpressureHelper.produced(pendingRequests, 1) > 0) {
return ctx.getStopAction();
}
@@ -302,7 +271,7 @@
}
public void onError(final Throwable error) {
- final Subscriber<? super LdapRawMessage> sub = subscriber;
+ final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
if (sub != null) {
subscriber = null;
sub.onError(error);
@@ -310,7 +279,7 @@
}
public void onComplete() {
- final Subscriber<? super LdapRawMessage> sub = subscriber;
+ final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
if (sub != null) {
subscriber = null;
sub.onComplete();
@@ -325,16 +294,17 @@
private final Connection<?> connection;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private final List<DisconnectListener> listeners = new LinkedList<>();
+ private final List<ConnectionEventListener> listeners = new LinkedList<>();
private SaslServer saslServer;
- GrizzlyBackpressureSubscription upstream;
+ private GrizzlyBackpressureSubscription downstream;
private ClientConnectionImpl(final Connection<?> connection) {
this.connection = connection;
}
- NextAction handleRead(final FilterChainContext ctx) {
- final GrizzlyBackpressureSubscription immutableRef = upstream;
+ @Override
+ public NextAction handleRead(final FilterChainContext ctx) {
+ final GrizzlyBackpressureSubscription immutableRef = downstream;
if (immutableRef != null) {
return immutableRef.handleRead(ctx);
}
@@ -342,29 +312,35 @@
return ctx.getSuspendAction();
}
- void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
- final GrizzlyBackpressureSubscription immutableRef = upstream;
- if (immutableRef != null) {
+ @Override
+ public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
+ final GrizzlyBackpressureSubscription immutableRef = downstream;
+ if (immutableRef == null) {
+ ctx.getConnection().closeSilently();
+ } else {
immutableRef.onError(error);
}
}
- NextAction handleClose(final FilterChainContext ctx) {
- final GrizzlyBackpressureSubscription immutableRef = upstream;
- if (immutableRef != null) {
- immutableRef.onComplete();
+ @Override
+ public NextAction handleClose(final FilterChainContext ctx) {
+ if (isClosed.compareAndSet(false, true)) {
+ final GrizzlyBackpressureSubscription immutableRef = downstream;
+ if (immutableRef != null) {
+ downstream.onComplete();
+ }
}
return ctx.getStopAction();
}
- Stream<LdapRawMessage> read() {
- return streamFromPublisher(new Publisher<LdapRawMessage>() {
+ Stream<LdapRequestEnvelope> read() {
+ return streamFromPublisher(new Publisher<LdapRequestEnvelope>() {
@Override
- public void subscribe(final Subscriber<? super LdapRawMessage> subscriber) {
- if (upstream != null) {
+ public void subscribe(final Subscriber<? super LdapRequestEnvelope> subscriber) {
+ if (downstream != null) {
return;
}
- upstream = new GrizzlyBackpressureSubscription(subscriber);
+ downstream = new GrizzlyBackpressureSubscription(subscriber);
}
});
}
@@ -382,7 +358,7 @@
public boolean enableTLS(final SSLEngine sslEngine, final boolean startTls) {
Reject.ifNull(sslEngine, "sslEngine must not be null");
synchronized (this) {
- if (isFilterExists(SSLFilter.class)) {
+ if (filterExists(SSLFilter.class)) {
return false;
}
SSLUtils.setSSLEngine(connection, sslEngine);
@@ -392,10 +368,17 @@
}
@Override
+ public SSLSession getSSLSession() {
+ final SSLEngine sslEngine = SSLUtils.getSSLEngine(connection);
+ return sslEngine != null ? sslEngine.getSession() : null;
+ }
+
+ @Override
public void enableSASL(final SaslServer saslServer) {
Reject.ifNull(saslServer, "saslServer must not be null");
synchronized (this) {
- if (isFilterExists(SaslFilter.class)) {
+ if (filterExists(SaslFilter.class)) {
+ // FIXME: The current saslServer must be replaced with the new one
return;
}
this.saslServer = saslServer;
@@ -404,6 +387,11 @@
}
@Override
+ public SaslServer getSASLServer() {
+ return saslServer;
+ }
+
+ @Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) connection.getLocalAddress();
}
@@ -438,15 +426,15 @@
int ssf = 0;
final String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
if (SaslFilter.SASL_AUTH_INTEGRITY.equalsIgnoreCase(qop)) {
- ssf = 1;
+ return 1;
} else if (SaslFilter.SASL_AUTH_CONFIDENTIALITY.equalsIgnoreCase(qop)) {
final String negStrength = (String) saslServer.getNegotiatedProperty(Sasl.STRENGTH);
if ("low".equalsIgnoreCase(negStrength)) {
- ssf = 40;
+ return 40;
} else if ("medium".equalsIgnoreCase(negStrength)) {
- ssf = 56;
+ return 56;
} else if ("high".equalsIgnoreCase(negStrength)) {
- ssf = 128;
+ return 128;
}
/*
* Treat anything else as if not security is provided and keep the server running
@@ -456,12 +444,6 @@
}
@Override
- public SSLSession getSSLSession() {
- final SSLEngine sslEngine = SSLUtils.getSSLEngine(connection);
- return sslEngine != null ? sslEngine.getSession() : null;
- }
-
- @Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("LDAPClientContext(");
@@ -482,7 +464,7 @@
GrizzlyUtils.addFilterToConnection(filter, connection);
}
- private boolean isFilterExists(Class<?> filterKlass) {
+ private boolean filterExists(Class<?> filterKlass) {
synchronized (this) {
final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
for (final Filter filter : currentFilterChain) {
@@ -495,7 +477,7 @@
}
@Override
- public void onDisconnect(DisconnectListener listener) {
+ public void addConnectionEventListener(ConnectionEventListener listener) {
Reject.ifNull(listener, "listener must not be null");
listeners.add(listener);
}
@@ -504,8 +486,8 @@
public void disconnect() {
if (isClosed.compareAndSet(false, true)) {
try {
- for (DisconnectListener listener : listeners) {
- listener.connectionDisconnected(this, null, null);
+ for (ConnectionEventListener listener : listeners) {
+ listener.handleConnectionDisconnected(this, null, null);
}
} finally {
closeConnection();
@@ -514,9 +496,9 @@
}
private void closeConnection() {
- if (upstream != null) {
- upstream.cancel();
- upstream = null;
+ if (downstream != null) {
+ downstream.cancel();
+ downstream = null;
}
connection.closeSilently();
}
@@ -526,11 +508,11 @@
// Close this connection context.
if (isClosed.compareAndSet(false, true)) {
sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode)
- .setOID(LDAP.OID_NOTICE_OF_DISCONNECTION)
+ .setOID(OID_NOTICE_OF_DISCONNECTION)
.setDiagnosticMessage(diagnosticMessage));
try {
- for (DisconnectListener listener : listeners) {
- listener.connectionDisconnected(this, resultCode, diagnosticMessage);
+ for (ConnectionEventListener listener : listeners) {
+ listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
}
} finally {
closeConnection();
@@ -538,11 +520,11 @@
}
}
- private void notifyConnectionClosed(final LdapRawMessage unbindRequest) {
+ private void notifyConnectionClosed(final LdapRequestEnvelope unbindRequest) {
// Close this connection context.
if (isClosed.compareAndSet(false, true)) {
if (unbindRequest == null) {
- doNotifySilentlyConnectionClosed(null);
+ notifySilentlyConnectionClosed(null);
} else {
try {
LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
@@ -550,23 +532,23 @@
@Override
public void unbindRequest(int messageID, UnbindRequest unbindRequest)
throws DecodeException, IOException {
- doNotifySilentlyConnectionClosed(unbindRequest);
+ notifySilentlyConnectionClosed(unbindRequest);
}
});
} catch (Exception e) {
- doNotifySilentlyConnectionClosed(null);
+ notifySilentlyConnectionClosed(null);
}
}
}
}
- private void doNotifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
+ private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
try {
- for (final DisconnectListener listener : listeners) {
+ for (final ConnectionEventListener listener : listeners) {
try {
- listener.connectionClosed(this, unbindRequest);
+ listener.handleConnectionClosed(this, unbindRequest);
} catch (Exception e) {
- // TODO: Log as a warning ?
+ logger.traceException(e);
}
}
} finally {
@@ -578,11 +560,11 @@
// Close this connection context.
if (isClosed.compareAndSet(false, true)) {
try {
- for (final DisconnectListener listener : listeners) {
+ for (final ConnectionEventListener listener : listeners) {
try {
- listener.exceptionOccurred(this, error);
+ listener.handleConnectionError(this, error);
} catch (Exception e) {
- // TODO: Log as a warning ?
+ logger.traceException(e);
}
}
} finally {
@@ -598,7 +580,7 @@
@Override
public void sendUnsolicitedNotification(final ExtendedResult notification) {
- connection.write(LdapMessages.newResponseMessage(LDAP.OP_TYPE_EXTENDED_RESPONSE, 0, notification));
+ connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification));
}
}
}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
index ed2e6f7..b20e71c 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -15,12 +15,15 @@
*/
package org.forgerock.opendj.grizzly;
+import static com.forgerock.opendj.ldap.CoreMessages.ERR_LDAP_CLIENT_DECODE_MAX_REQUEST_SIZE_EXCEEDED;
import static org.forgerock.opendj.io.LDAP.*;
+import static org.forgerock.opendj.ldap.spi.LdapMessages.newRequestEnvelope;
import java.io.IOException;
import org.forgerock.opendj.io.LDAPWriter;
import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.CompareResult;
@@ -30,40 +33,31 @@
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
-import org.forgerock.opendj.ldap.spi.LdapMessages;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
import org.glassfish.grizzly.Buffer;
-import org.glassfish.grizzly.Grizzly;
-import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.attributes.AttributeStorage;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
+/**
+ * Decodes {@link LdapRequestEnvelope} and encodes {@link Response}. This class keeps a state to handler the Ldap V2
+ * detection, a LdapCodec instance cannot be shared accross different connection
+ */
abstract class LdapCodec extends LDAPBaseFilter {
- private static final Attribute<Boolean> IS_LDAP_V2 = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
- .createAttribute(LdapCodec.class.getName() + ".IsLdapV2", Boolean.FALSE);
-
- private static final Attribute<Boolean> IS_LDAP_V2_PENDING = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
- .createAttribute(LdapCodec.class.getName() + ".PendingLdapV2", Boolean.FALSE);
+ private boolean isLdapV2Pending = false;
+ private boolean isLdapV2 = false;
LdapCodec(final int maxElementSize, final DecodeOptions decodeOptions) {
super(decodeOptions, maxElementSize);
}
@Override
- public NextAction handleAccept(FilterChainContext ctx) throws IOException {
- // Default value mechanism of Grizzly's attribute doesn't seems to work.
- IS_LDAP_V2.set(ctx.getConnection(), Boolean.FALSE);
- return ctx.getInvokeAction();
- }
-
- @Override
public NextAction handleRead(final FilterChainContext ctx) throws IOException {
try {
final Buffer buffer = ctx.getMessage();
- final LdapRawMessage message;
+ final LdapRequestEnvelope message;
message = readMessage(buffer, ctx.getConnection());
if (message != null) {
@@ -73,9 +67,7 @@
return ctx.getStopAction(getRemainingBuffer(buffer));
} catch (Exception e) {
onLdapCodecError(ctx, e);
- // make the connection deaf to any following input
- // onLdapDecodeError call will take care of error processing
- // and closing the connection
+ ctx.getConnection().closeSilently();
final NextAction suspendAction = ctx.getSuspendAction();
ctx.completeAndRecycle();
return suspendAction;
@@ -84,7 +76,7 @@
protected abstract void onLdapCodecError(FilterChainContext ctx, Throwable error);
- private LdapRawMessage readMessage(final Buffer buffer, final AttributeStorage attributeStorage)
+ private LdapRequestEnvelope readMessage(final Buffer buffer, final AttributeStorage attributeStorage)
throws IOException {
try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
final int packetStart = buffer.position();
@@ -95,7 +87,8 @@
final int length = reader.peekLength();
if (length > maxASN1ElementSize) {
buffer.position(packetStart);
- throw new IllegalStateException();
+ throw DecodeException.fatalError(
+ ERR_LDAP_CLIENT_DECODE_MAX_REQUEST_SIZE_EXCEEDED.get(length, maxASN1ElementSize));
}
final Buffer packet = buffer.slice(packetStart, buffer.position() + length);
@@ -105,7 +98,7 @@
}
}
- private LdapRawMessage decodePacket(final ASN1BufferReader reader, final AttributeStorage attributeStorage)
+ private LdapRequestEnvelope decodePacket(final ASN1BufferReader reader, final AttributeStorage attributeStorage)
throws IOException {
reader.mark();
try {
@@ -114,17 +107,17 @@
final byte messageType = reader.peekType();
final ByteString rawDn;
- final int protocolVersion;
+ final int ldapVersion;
switch (messageType) {
case OP_TYPE_BIND_REQUEST:
reader.readStartSequence(messageType);
- protocolVersion = (int) reader.readInteger();
+ ldapVersion = (int) reader.readInteger();
rawDn = reader.readOctetString();
- IS_LDAP_V2_PENDING.set(attributeStorage, protocolVersion == 2);
+ isLdapV2Pending = ldapVersion == 2;
break;
case OP_TYPE_DELETE_REQUEST:
rawDn = reader.readOctetString(messageType);
- protocolVersion = -1;
+ ldapVersion = -1;
break;
case OP_TYPE_ADD_REQUEST:
case OP_TYPE_COMPARE_REQUEST:
@@ -133,13 +126,13 @@
case OP_TYPE_SEARCH_REQUEST:
reader.readStartSequence(messageType);
rawDn = reader.readOctetString();
- protocolVersion = -1;
+ ldapVersion = -1;
break;
default:
rawDn = null;
- protocolVersion = -1;
+ ldapVersion = -1;
}
- return LdapMessages.newRawMessage(messageType, messageId, protocolVersion, rawDn, reader);
+ return newRequestEnvelope(messageType, messageId, ldapVersion, rawDn, reader);
} finally {
reader.reset();
}
@@ -152,11 +145,10 @@
@Override
public NextAction handleWrite(final FilterChainContext ctx) throws IOException {
final LdapResponseMessage response = ctx.<LdapResponseMessage>getMessage();
- if (response.getMessageType() == OP_TYPE_BIND_RESPONSE) {
- final Boolean isLdapV2 = IS_LDAP_V2_PENDING.remove(ctx.getConnection());
- IS_LDAP_V2.set(ctx.getConnection(), isLdapV2);
+ if (response.getMessageType() == OP_TYPE_BIND_RESPONSE && ((BindResult) response.getContent()).isSuccess()) {
+ isLdapV2 = isLdapV2Pending;
}
- final int protocolVersion = IS_LDAP_V2.get(ctx.getConnection()) ? 2 : 3;
+ final int protocolVersion = isLdapV2 ? 2 : 3;
final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(ctx.getMemoryManager(), protocolVersion);
try {
@@ -165,9 +157,7 @@
return ctx.getInvokeAction();
} catch (Exception e) {
onLdapCodecError(ctx, e);
- // make the connection deaf to any following input
- // onLdapDecodeError call will take care of error processing
- // and closing the connection
+ ctx.getConnection().closeSilently();
final NextAction suspendAction = ctx.getSuspendAction();
ctx.completeAndRecycle();
return suspendAction;
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index 007ce33..adecbcf 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -15,7 +15,10 @@
*/
package org.forgerock.opendj.grizzly;
+import java.util.concurrent.CancellationException;
+
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
+import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.WriteHandler;
import org.reactivestreams.Subscriber;
@@ -23,55 +26,79 @@
import com.forgerock.reactive.Completable;
-final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> {
+final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage>, CompletionHandler {
private final Connection<?> connection;
- private final Completable.Subscriber completable;
+ private final Completable.Subscriber downstream;
private Subscription upstream;
- LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) {
+ LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber downstream) {
this.connection = connection;
- this.completable = completable;
+ this.downstream = downstream;
}
@Override
- public void onSubscribe(Subscription s) {
- this.upstream = s;
- requestMore();
+ public void onSubscribe(final Subscription s) {
+ if (upstream != null) {
+ s.cancel();
+ return;
+ }
+ upstream = s;
+ connection.notifyCanWrite(new WriteHandler() {
+ @Override
+ public void onWritePossible() throws Exception {
+ final Subscription sub = upstream;
+ if (sub != null) {
+ sub.request(1);
+ }
+ }
+
+ @Override
+ public void onError(final Throwable error) {
+ LdapResponseMessageWriter.this.onError(error);
+ }
+ });
}
- private void requestMore() {
- if (connection.canWrite()) {
- upstream.request(1);
- } else {
- connection.notifyCanWrite(new WriteHandler() {
- @Override
- public void onWritePossible() throws Exception {
- upstream.request(1);
- }
+ @Override
+ public void onNext(final LdapResponseMessage message) {
+ connection.write(message).addCompletionHandler(this);
+ }
- @Override
- public void onError(Throwable t) {
- upstream.cancel();
- completable.onError(t);
- }
- });
+ @Override
+ public void completed(final Object result) {
+ final Subscription sub = upstream;
+ if (sub != null) {
+ sub.request(1);
}
}
@Override
- public void onNext(LdapResponseMessage message) {
- connection.write(message);
- requestMore();
+ public void cancelled() {
+ failed(new CancellationException());
}
@Override
- public void onError(Throwable t) {
- completable.onError(t);
+ public void failed(final Throwable error) {
+ onError(error);
+ }
+
+ @Override
+ public void updated(final Object result) {
+ // Nothing to do
+ }
+
+ @Override
+ public void onError(final Throwable error) {
+ upstream.cancel();
+ upstream = null;
+ downstream.onError(error);
}
@Override
public void onComplete() {
- completable.onComplete();
+ upstream.cancel();
+ upstream = null;
+ downstream.onComplete();
}
}
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
index 0d1933b..7660605 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
@@ -20,6 +20,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.fest.assertions.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.Connections.*;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
import static org.forgerock.opendj.ldap.LdapException.newLdapException;
@@ -34,6 +35,7 @@
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -90,6 +92,8 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
/**
* Tests the {@code ConnectionFactory} classes.
*/
@@ -544,11 +548,12 @@
}
});
- LDAPListener listener = new LDAPListener(new InetSocketAddress("127.0.0.1", 0), mockServer);
+ LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), mockServer));
+ final InetSocketAddress listenerAddr = listener.getSocketAddresses().iterator().next();
try {
- LDAPConnectionFactory clientFactory = new LDAPConnectionFactory(
- ((InetSocketAddress) listener.getSocketAddresses().iterator().next()).getHostName(),
- ((InetSocketAddress) listener.getSocketAddresses().iterator().next()).getPort());
+ LDAPConnectionFactory clientFactory = new LDAPConnectionFactory(listenerAddr.getHostName(),
+ listenerAddr.getPort());
final Connection client = clientFactory.getConnection();
connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
MockConnectionEventListener mockListener = null;
@@ -627,12 +632,12 @@
}
});
- LDAPListener listener = new LDAPListener(new InetSocketAddress("127.0.0.1", 0), mockServer);
+ LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), mockServer));
+ final InetSocketAddress listenerAddr = listener.getSocketAddresses().iterator().next();
try {
- LDAPConnectionFactory clientFactory =
- new LDAPConnectionFactory(
- listener.getSocketAddresses().iterator().next().getHostName(),
- listener.getSocketAddresses().iterator().next().getPort());
+ LDAPConnectionFactory clientFactory = new LDAPConnectionFactory(listenerAddr.getHostName(),
+ listenerAddr.getPort());
final Connection client = clientFactory.getConnection();
connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
try {
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java
index 454b641..59f3ab3 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java
@@ -17,7 +17,7 @@
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;
-import static org.forgerock.opendj.ldap.CommonLDAPOptions.*;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
import static org.forgerock.opendj.ldap.TestCaseUtils.*;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -71,6 +72,8 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
/**
* Tests the {@link LDAPConnectionFactory} class.
*/
@@ -349,16 +352,19 @@
private LDAPListener createServer() {
try {
- return new LDAPListener(loopbackWithDynamicPort(),
- new ServerConnectionFactory<LDAPClientContext, Integer>() {
- @Override
- public ServerConnection<Integer> handleAccept(
- final LDAPClientContext clientContext) throws LdapException {
- context.set(clientContext);
- connectLatch.release();
- return serverConnection;
- }
- });
+ return new LDAPListener(
+ Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ new ServerConnectionFactory<LDAPClientContext, Integer>() {
+ @Override
+ public ServerConnection<Integer> handleAccept(final LDAPClientContext clientContext)
+ throws LdapException {
+ context.set(clientContext);
+ connectLatch.release();
+ return serverConnection;
+ }
+ })
+ );
} catch (IOException e) {
fail("Unable to create LDAP listener", e);
return null;
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java
index 9e44e68..297cdc5 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java
@@ -11,23 +11,23 @@
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions Copyright [year] [name of copyright owner]".
*
- * Copyright 2013-2015 ForgeRock AS.
+ * Copyright 2013-2016 ForgeRock AS.
*/
package org.forgerock.opendj.grizzly;
import static org.fest.assertions.Assertions.assertThat;
-import static org.forgerock.util.time.Duration.duration;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-
+import static org.forgerock.opendj.ldap.LDAPListener.LDAP_DECODE_OPTIONS;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.REQUEST_TIMEOUT;
+import static org.forgerock.util.time.Duration.duration;
+import static org.mockito.Mockito.*;
+
import java.net.InetSocketAddress;
+import java.util.Collections;
import org.forgerock.opendj.ldap.Connections;
-import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.LDAPListener;
+import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.RequestHandler;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.SdkTestCase;
@@ -43,6 +43,8 @@
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
/**
* Tests LDAP connection implementation class.
*/
@@ -74,9 +76,9 @@
* and leave the client waiting forever for a response.
*/
@SuppressWarnings("unchecked")
- LDAPListener listener =
- new LDAPListener(address, Connections
- .newServerConnectionFactory(mock(RequestHandler.class)));
+ LDAPListener listener = new LDAPListener(Collections.singleton(address),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ Connections.newServerConnectionFactory(mock(RequestHandler.class))));
/*
* Use a very long time out in order to prevent the timeout thread from
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
index 6a2cedb..459eebd 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
@@ -16,8 +16,20 @@
*/
package org.forgerock.opendj.grizzly;
+import static java.util.Arrays.asList;
+import static org.fest.assertions.Assertions.assertThat;
+import static org.fest.assertions.Fail.fail;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
+import static org.forgerock.opendj.ldap.Connections.*;
+import static org.forgerock.opendj.ldap.LDAPListener.REQUEST_MAX_SIZE_IN_BYTES;
+import static org.forgerock.opendj.ldap.LdapException.newLdapException;
+import static org.forgerock.opendj.ldap.TestCaseUtils.*;
+import static org.forgerock.util.Options.defaultOptions;
+import static org.mockito.Mockito.mock;
+
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@@ -32,9 +44,9 @@
import org.forgerock.opendj.ldap.LDAPConnectionFactory;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
+import org.forgerock.opendj.ldap.LdapResultHandler;
import org.forgerock.opendj.ldap.ProviderNotFoundException;
import org.forgerock.opendj.ldap.ResultCode;
-import org.forgerock.opendj.ldap.LdapResultHandler;
import org.forgerock.opendj.ldap.SdkTestCase;
import org.forgerock.opendj.ldap.SearchResultHandler;
import org.forgerock.opendj.ldap.ServerConnection;
@@ -61,16 +73,7 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static java.util.Arrays.asList;
-import static org.fest.assertions.Assertions.*;
-import static org.fest.assertions.Fail.*;
-import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
-import static org.forgerock.opendj.ldap.Connections.newRoundRobinLoadBalancer;
-import static org.forgerock.opendj.ldap.LdapException.*;
-import static org.forgerock.opendj.ldap.LDAPListener.*;
-import static org.forgerock.opendj.ldap.TestCaseUtils.*;
-import static org.forgerock.util.Options.defaultOptions;
-import static org.mockito.Mockito.*;
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/** Tests the LDAPListener class. */
@SuppressWarnings("javadoc")
@@ -201,7 +204,9 @@
public void testCreateLDAPListener() throws Exception {
// test no exception is thrown, which means transport provider is
// correctly loaded
- LDAPListener listener = new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class));
+ LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ mock(ServerConnectionFactory.class)));
listener.close();
}
@@ -212,8 +217,10 @@
// test no exception is thrown, which means transport provider is correctly loaded
Options options = defaultOptions().set(TRANSPORT_PROVIDER_CLASS_LOADER,
Thread.currentThread().getContextClassLoader());
- LDAPListener listener =
- new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class), options);
+ LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS),
+ mock(ServerConnectionFactory.class)),
+ options);
listener.close();
}
@@ -223,8 +230,10 @@
expectedExceptionsMessageRegExp = "^The requested provider 'unknown' .*")
public void testCreateLDAPListenerFailureProviderNotFound() throws Exception {
Options options = defaultOptions().set(TRANSPORT_PROVIDER, "unknown");
- LDAPListener listener
- = new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class), options);
+ LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS),
+ mock(ServerConnectionFactory.class)),
+ options);
listener.close();
}
@@ -239,8 +248,10 @@
final MockServerConnection serverConnection = new MockServerConnection();
final MockServerConnectionFactory serverConnectionFactory =
new MockServerConnectionFactory(serverConnection);
- final LDAPListener listener = new LDAPListener(loopbackWithDynamicPort(), serverConnectionFactory);
- final InetSocketAddress addr = (InetSocketAddress) listener.getSocketAddresses().iterator().next();
+ final LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ serverConnectionFactory));
+ final InetSocketAddress addr = listener.firstSocketAddress();
try {
// Connect and close.
final Connection connection =
@@ -268,9 +279,10 @@
final MockServerConnection onlineServerConnection = new MockServerConnection();
final MockServerConnectionFactory onlineServerConnectionFactory =
new MockServerConnectionFactory(onlineServerConnection);
- final LDAPListener onlineServerListener =
- new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
- final InetSocketAddress onlineAddr = onlineServerListener.getSocketAddresses().iterator().next();
+ final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ onlineServerConnectionFactory));
+ final InetSocketAddress onlineAddr = onlineServerListener.firstSocketAddress();
try {
// Connection pool and load balancing tests.
@@ -312,9 +324,10 @@
};
- final LDAPListener proxyListener =
- new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
- final InetSocketAddress proxyAddr = proxyListener.getSocketAddresses().iterator().next();
+ final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ proxyServerConnectionFactory));
+ final InetSocketAddress proxyAddr = proxyListener.firstSocketAddress();
try {
// Connect and close.
final Connection connection =
@@ -349,9 +362,10 @@
final MockServerConnection onlineServerConnection = new MockServerConnection();
final MockServerConnectionFactory onlineServerConnectionFactory =
new MockServerConnectionFactory(onlineServerConnection);
- final LDAPListener onlineServerListener =
- new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
- final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
+ final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ onlineServerConnectionFactory));
+ final InetSocketAddress onlineServerAddr = onlineServerListener.firstSocketAddress();
try {
// Connection pool and load balancing tests.
@@ -401,10 +415,10 @@
final MockServerConnectionFactory proxyServerConnectionFactory =
new MockServerConnectionFactory(proxyServerConnection);
- final LDAPListener proxyListener =
- new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
- final InetSocketAddress proxyAddr =
- (InetSocketAddress) proxyListener.getSocketAddresses().iterator().next();
+ final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ proxyServerConnectionFactory));
+ final InetSocketAddress proxyAddr = (InetSocketAddress) proxyListener.firstSocketAddress();
try {
// Connect, bind, and close.
@@ -444,9 +458,10 @@
final MockServerConnection onlineServerConnection = new MockServerConnection();
final MockServerConnectionFactory onlineServerConnectionFactory =
new MockServerConnectionFactory(onlineServerConnection);
- final LDAPListener onlineServerListener =
- new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
- final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
+ final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ onlineServerConnectionFactory));
+ final InetSocketAddress onlineServerAddr = onlineServerListener.firstSocketAddress();
try {
final MockServerConnection proxyServerConnection = new MockServerConnection();
@@ -494,8 +509,9 @@
};
- final LDAPListener proxyListener =
- new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
+ final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ proxyServerConnectionFactory));
try {
// Connect and close.
final Connection connection =
@@ -529,9 +545,10 @@
final MockServerConnection onlineServerConnection = new MockServerConnection();
final MockServerConnectionFactory onlineServerConnectionFactory =
new MockServerConnectionFactory(onlineServerConnection);
- final LDAPListener onlineServerListener =
- new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
- final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
+ final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ onlineServerConnectionFactory));
+ final InetSocketAddress onlineServerAddr = onlineServerListener.firstSocketAddress();
try {
final MockServerConnection proxyServerConnection = new MockServerConnection() {
@@ -574,11 +591,12 @@
}
};
- final InetSocketAddress proxyAddr = findFreeSocketAddress();
final MockServerConnectionFactory proxyServerConnectionFactory =
new MockServerConnectionFactory(proxyServerConnection);
- final LDAPListener proxyListener =
- new LDAPListener(proxyAddr, proxyServerConnectionFactory);
+ final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+ proxyServerConnectionFactory));
+ final InetSocketAddress proxyAddr = proxyListener.firstSocketAddress();
try {
// Connect, bind, and close.
final Connection connection =
@@ -617,8 +635,9 @@
final MockServerConnectionFactory factory =
new MockServerConnectionFactory(serverConnection);
final Options options = defaultOptions().set(REQUEST_MAX_SIZE_IN_BYTES, 2048);
- final InetSocketAddress addr = findFreeSocketAddress();
- final LDAPListener listener = new LDAPListener(addr, factory, options);
+ final LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS), factory), options);
+ final InetSocketAddress addr = listener.firstSocketAddress();
Connection connection = null;
try {
@@ -674,11 +693,10 @@
@Test
public void testServerDisconnect() throws Exception {
final MockServerConnection serverConnection = new MockServerConnection();
- final MockServerConnectionFactory factory =
- new MockServerConnectionFactory(serverConnection);
- final LDAPListener listener =
- new LDAPListener(loopbackWithDynamicPort(), factory);
- final InetSocketAddress listenerAddr = listener.getSocketAddresses().iterator().next();
+ final MockServerConnectionFactory factory = new MockServerConnectionFactory(serverConnection);
+ final LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), factory));
+ final InetSocketAddress listenerAddr = listener.firstSocketAddress();
final Connection connection;
try {
diff --git a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
index da1f1f8..75f9c42 100644
--- a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
+++ b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -17,9 +17,8 @@
package org.forgerock.opendj.examples;
-import static org.forgerock.opendj.ldap.LDAPConnectionFactory.AUTHN_BIND_REQUEST;
-import static org.forgerock.opendj.ldap.LDAPConnectionFactory.HEARTBEAT_ENABLED;
-import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
+import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
import java.io.IOException;
@@ -39,6 +38,8 @@
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.util.Options;
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
/**
* An LDAP load balancing proxy which forwards requests to one or more remote
* Directory Servers. This is implementation is very simple and is only intended
@@ -142,7 +143,8 @@
final Options options = Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096);
LDAPListener listener = null;
try {
- listener = new LDAPListener(localAddress, localPort, connectionHandler, options);
+ listener = new LDAPListener(localAddress, localPort, new ServerConnectionFactoryAdapter(
+ options.get(LDAP_DECODE_OPTIONS), connectionHandler), options);
System.out.println("Press any key to stop the server...");
System.in.read();
} catch (final IOException e) {
diff --git a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
index 108b307..65ae330 100644
--- a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
+++ b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
@@ -12,13 +12,13 @@
* information: "Portions Copyright [year] [name of copyright owner]".
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS.
+ * Portions Copyright 2011-2016 ForgeRock AS.
*/
package org.forgerock.opendj.examples;
import static org.forgerock.opendj.ldap.Connections.newCachedConnectionPool;
-import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
import java.io.IOException;
@@ -64,6 +64,8 @@
import org.forgerock.opendj.ldap.schema.AttributeType;
import org.forgerock.util.Options;
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
/**
* This example is based on the {@link Proxy}. This example does no load
* balancing, but instead rewrites attribute descriptions and DN suffixes in
@@ -419,7 +421,8 @@
final Options listenerOptions = Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096);
LDAPListener listener = null;
try {
- listener = new LDAPListener(localAddress, localPort, connectionHandler, listenerOptions);
+ listener = new LDAPListener(localAddress, localPort, new ServerConnectionFactoryAdapter(
+ listenerOptions.get(LDAP_DECODE_OPTIONS), connectionHandler), listenerOptions);
System.out.println("Press any key to stop the server...");
System.in.read();
} catch (final IOException e) {
diff --git a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java
index e2c4a2a..5288836 100644
--- a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java
+++ b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java
@@ -17,7 +17,7 @@
package org.forgerock.opendj.examples;
-import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
import java.io.FileInputStream;
import java.io.IOException;
@@ -38,6 +38,8 @@
import org.forgerock.opendj.ldif.LDIFEntryReader;
import org.forgerock.util.Options;
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
/**
* An LDAP directory server which exposes data contained in an LDIF file. This
* is implementation is very simple and is only intended as an example:
@@ -116,10 +118,13 @@
}
};
- listener = new LDAPListener(localAddress, localPort, sslWrapper, options);
+ listener = new LDAPListener(localAddress, localPort,
+ new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS), sslWrapper), options);
} else {
// No SSL.
- listener = new LDAPListener(localAddress, localPort, connectionHandler, options);
+ listener = new LDAPListener(localAddress, localPort,
+ new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS), connectionHandler),
+ options);
}
System.out.println("Press any key to stop the server...");
System.in.read();
diff --git a/opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java b/opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java
index 7e49c4a..f3836af 100644
--- a/opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java
+++ b/opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java
@@ -16,9 +16,11 @@
package com.forgerock.opendj.ldap.tools;
import static org.fest.assertions.Assertions.assertThat;
-import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
+import static org.forgerock.opendj.ldap.TestCaseUtils.loopbackWithDynamicPort;
+import static org.forgerock.opendj.ldap.LDAPListener.LDAP_DECODE_OPTIONS;
import java.io.IOException;
+import java.util.Collections;
import org.forgerock.opendj.ldap.IntermediateResponseHandler;
import org.forgerock.opendj.ldap.LDAPClientContext;
@@ -44,6 +46,9 @@
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.util.Options;
+
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/** Mocked request handler sketelon for tools test cases. */
class ToolLdapServer implements ServerConnectionFactory<LDAPClientContext, Integer> {
@@ -153,7 +158,8 @@
}
void start() throws IOException {
- listener = new LDAPListener(findFreeSocketAddress(), this);
+ listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+ new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), this));
}
void stop() {
@@ -161,10 +167,10 @@
}
String getHostName() {
- return listener.getSocketAddress().getHostName();
+ return listener.getSocketAddresses().iterator().next().getHostName();
}
String getPort() {
- return Integer.toString(listener.getSocketAddress().getPort());
+ return Integer.toString(listener.getSocketAddresses().iterator().next().getPort());
}
}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java
index 6c56560..d96d06c 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java
@@ -460,7 +460,9 @@
/**
* Converts from OpenDJ server {@link org.opends.server.types.Attribute} to OpenDJ LDAP SDK
- * {@link org.forgerock.opendj.ldap.Attribute}.
+ * {@link org.forgerock.opendj.ldap.Attribute}. This method is an optimization of the equivalent from() method. It's
+ * only used in encoding/decoding and as such only wrap the required methods: the returned object partially wrap the
+ * incoming one.
*
* @param attribute
* value to convert
@@ -625,14 +627,15 @@
}
/**
- * Converts from OpenDJ server
- * {@link org.opends.server.types.SearchResultEntry} to OpenDJ LDAP SDK
- * {@link org.forgerock.opendj.ldap.responses.SearchResultEntry}.
+ * Converts from OpenDJ server {@link org.opends.server.types.SearchResultEntry} to OpenDJ LDAP SDK
+ * {@link org.forgerock.opendj.ldap.responses.SearchResultEntry}. This method is an optimization of the equivalent
+ * from() method. It's only used in encoding/decoding and as such only wrap the required methods: the returned
+ * object partially wrap the incoming one.
*
* @param srvResultEntry
- * value to convert
+ * value to convert
* @param ldapVersion
- * Version of the ldap protocol
+ * Version of the ldap protocol
* @return the converted value
*/
public static org.forgerock.opendj.ldap.responses.SearchResultEntry partiallyWrap(
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index af9a708..2cb5125 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -54,14 +54,14 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
+import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.opends.server.api.ClientConnection;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.core.AbandonOperationBasis;
@@ -123,7 +123,7 @@
* instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
*/
public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
- ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
+ ReactiveHandler<QueueingStrategy, LdapRequestEnvelope, Stream<Response>> {
private static final String REACTIVE_OUT = "reactive.out";
/** The tracer object for the debug logger. */
@@ -230,9 +230,9 @@
}
connectionID = DirectoryServer.newConnectionAccepted(this);
- clientContext.onDisconnect(new DisconnectListener() {
+ clientContext.addConnectionEventListener(new ConnectionEventListener() {
@Override
- public void exceptionOccurred(LDAPClientContext context, Throwable error) {
+ public void handleConnectionError(LDAPClientContext context, Throwable error) {
if (error instanceof LocalizableException) {
disconnect(
DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
@@ -242,13 +242,13 @@
}
@Override
- public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
+ public void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode,
String diagnosticMessage) {
disconnect(DisconnectReason.SERVER_ERROR, false, null);
}
@Override
- public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
+ public void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
}
});
@@ -372,7 +372,7 @@
*/
@Override
public boolean isSecure() {
- return false;
+ return clientContext.getSSLSession() != null || clientContext.getSASLServer() != null;
}
/**
@@ -405,7 +405,7 @@
// an error result to the client indicating that a problem occurred.
if (removeOperationInProgress(operation.getMessageID())) {
final Response response = operationToResponse(operation);
- final FlowableEmitter<Response> out = getOut(operation);
+ final FlowableEmitter<Response> out = getAttachedEmitter(operation);
if (response != null) {
out.onNext(response);
}
@@ -534,22 +534,18 @@
* The search result entry to be sent to the client
*/
@Override
- public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
- getEmitter(searchOperation).onNext(toResponse(searchEntry));
+ public void sendSearchEntry(final SearchOperation searchOperation, final SearchResultEntry searchEntry) {
+ getAttachedEmitter(searchOperation).onNext(toResponse(searchEntry));
}
- private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
- return getOut(searchOperation);
- }
-
- private Response toResponse(SearchResultEntry searchEntry) {
- return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
- }
-
- private FlowableEmitter<Response> getOut(Operation operation) {
+ private FlowableEmitter<Response> getAttachedEmitter(final Operation operation) {
return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
}
+ private Response toResponse(final SearchResultEntry searchEntry) {
+ return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
+ }
+
/**
* Sends the provided search result reference to the client.
*
@@ -572,7 +568,7 @@
return false;
}
- final FlowableEmitter<Response> out = getOut(searchOperation);
+ final FlowableEmitter<Response> out = getAttachedEmitter(searchOperation);
out.onNext(Converters.from(searchReference));
return true;
@@ -589,7 +585,7 @@
@Override
protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
final Operation operation = intermediateResponse.getOperation();
- final FlowableEmitter<Response> out = getOut(operation);
+ final FlowableEmitter<Response> emitter = getAttachedEmitter(operation);
final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
intermediateResponse.getValue());
@@ -597,7 +593,7 @@
response.addControl(Converters.from(control));
}
- out.onNext(response);
+ emitter.onNext(response);
// The only reason we shouldn't continue processing is if the
// connection is closed.
@@ -962,14 +958,15 @@
* a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
*/
@Override
- public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
- return streamFromPublisher(
- new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
+ public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
+ return streamFromPublisher(new BlockingBackpressureSubscription(
+ Flowable.create(new FlowableOnSubscribe<Response>() {
@Override
public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
}
- }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() {
+ }, BackpressureStrategy.ERROR)))
+ .onNext(new Consumer<Response>() {
@Override
public void accept(final Response response) throws Exception {
if (keepStats) {
@@ -1003,7 +1000,7 @@
}
}
- private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) {
+ private final byte toLdapResponseType(final LdapRequestEnvelope rawRequest, final Response response) {
if (response instanceof Result) {
return toLdapResultType(rawRequest.getMessageType());
}
@@ -1160,8 +1157,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1229,17 +1226,17 @@
versionString = "2";
if (!connectionHandler.allowLDAPv2()) {
- out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
- disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
+ disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
return false;
}
if (!controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1390,8 +1387,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1514,8 +1511,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1569,8 +1566,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
@@ -1626,8 +1623,8 @@
final List<Control> controls, final FlowableEmitter<Response> out) {
if (ldapVersion == 2 && !controls.isEmpty()) {
// LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
- ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+ .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
out.onComplete();
disconnectControlsNotAllowed();
return false;
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index b7dc22f..5b5d2ba 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -49,17 +49,16 @@
import org.forgerock.opendj.config.server.ConfigChangeResult;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.config.server.ConfigurationChangeListener;
-import org.forgerock.opendj.grizzly.GrizzlyLDAPListener;
import org.forgerock.opendj.ldap.AddressMask;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
+import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.Response;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.server.config.server.ConnectionHandlerCfg;
import org.forgerock.opendj.server.config.server.LDAPConnectionHandlerCfg;
import org.forgerock.util.Function;
@@ -129,7 +128,7 @@
/** SSL instance name used in context creation. */
private static final String SSL_CONTEXT_INSTANCE_NAME = "TLS";
- private GrizzlyLDAPListener listener;
+ private LDAPListener listener;
/** The current configuration state. */
private LDAPConnectionHandlerCfg currentConfig;
@@ -636,44 +635,45 @@
}
private void startListener() throws IOException {
- listener = new GrizzlyLDAPListener(
+ listener = new LDAPListener(
listenAddresses,
- Options.defaultOptions().set(LDAPListener.CONNECT_MAX_BACKLOG, backlog)
- .set(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES, (int) currentConfig.getMaxRequestSize()),
new Function<LDAPClientContext,
- ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+ ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
LdapException>() {
@Override
- public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> apply(
+ public ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> apply(
LDAPClientContext clientContext) throws LdapException {
final LDAPClientConnection2 conn = canAccept(clientContext);
connectionList.add(conn);
- clientContext.onDisconnect(new DisconnectListener() {
+ clientContext.addConnectionEventListener(new ConnectionEventListener() {
@Override
- public void exceptionOccurred(LDAPClientContext context, Throwable error) {
+ public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
connectionList.remove(conn);
}
@Override
- public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
- String diagnosticMessage) {
+ public void handleConnectionDisconnected(final LDAPClientContext context,
+ final ResultCode resultCode, String diagnosticMessage) {
connectionList.remove(conn);
}
@Override
- public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
+ public void handleConnectionClosed(final LDAPClientContext context,
+ final UnbindRequest unbindRequest) {
connectionList.remove(conn);
}
});
- return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
+ return new ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>() {
@Override
- public Stream<Response> handle(LDAPClientContext context, LdapRawMessage request)
- throws Exception {
+ public Stream<Response> handle(final LDAPClientContext context,
+ final LdapRequestEnvelope request) throws Exception {
return conn.handle(queueingStrategy, request);
}
};
}
- });
+ }, Options.defaultOptions()
+ .set(LDAPListener.CONNECT_MAX_BACKLOG, backlog)
+ .set(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES, (int) currentConfig.getMaxRequestSize()));
}
/**
@@ -684,8 +684,6 @@
public void run() {
setName(handlerName);
boolean starting = true;
- setName(handlerName);
-
boolean lastIterationFailed = false;
while (!shutdownRequested) {
@@ -724,7 +722,6 @@
// If we have gotten here, then we are about to start listening
// for the first time since startup or since we were previously disabled.
- // Start the embedded HTTP server
startListener();
lastIterationFailed = false;
} catch (Exception e) {
--
Gitblit v1.10.0