opendj-core/clirr-ignored-api-changes.xml
@@ -173,7 +173,7 @@ <difference> <className>org/forgerock/opendj/ldap/LDAPClientContext</className> <differenceType>7012</differenceType> <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContext$ConnectionEventListener)</method> <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContextEventListener)</method> <justification>Allows to register connection state listener</justification> </difference> <difference> opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -63,14 +63,14 @@ Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function); /** * Subscribe to the result of this {@link Completable}. * Returns a {@link Completable} instance that calls the given onTerminate callback after this Completable completes * normally or with an exception. * * @param completeAction * An {@link Action} which will be invoked on successful completion * @param errorConsumer * A {@link Consumer} which will be invoked on error * @param onAfterTerminate * the callback to call after this {@link Completable} terminates * @return the new {@link Completable} instance */ void subscribe(Action completeAction, Consumer<Throwable> errorConsumer); Completable doAfterTerminate(Action onAfterTerminate); /** * Returns a {@link Single} which will complete with the provided value once this {@link Completable} completes. @@ -82,4 +82,19 @@ * @return A new {@link Single} */ <V> Single<V> toSingle(V value); /** * Subscribe to the result of this {@link Completable}. * * @param completeAction * An {@link Action} which will be invoked on successful completion * @param errorConsumer * A {@link Consumer} which will be invoked on error */ void subscribe(Action completeAction, Consumer<Throwable> errorConsumer); /** * Subscribes to this {@link Completable}. */ void subscribe(); } opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -379,5 +379,20 @@ } }); } @Override public Completable doAfterTerminate(final Action onTerminate) { return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.functions.Action() { @Override public void run() throws Exception { onTerminate.run(); } })); } @Override public void subscribe() { impl.subscribe(); } } } opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
@@ -28,7 +28,7 @@ import org.forgerock.opendj.ldap.DecodeOptions; import org.forgerock.opendj.ldap.IntermediateResponseHandler; import org.forgerock.opendj.ldap.LDAPClientContext; import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener; import org.forgerock.opendj.ldap.LDAPClientContextEventListener; import org.forgerock.opendj.ldap.LdapException; import org.forgerock.opendj.ldap.LdapResultHandler; import org.forgerock.opendj.ldap.ResultCode; @@ -121,7 +121,7 @@ final ServerConnection<Integer> serverConnection) { this.decodeOptions = checkNotNull(decodeOptions, "decodeOptions must not be null"); this.adaptee = checkNotNull(serverConnection, "serverConnection must not be null"); clientContext.addConnectionEventListener(new ConnectionEventListener() { clientContext.addListener(new LDAPClientContextEventListener() { @Override public void handleConnectionError(final LDAPClientContext context, final Throwable error) { adaptee.handleConnectionError(error); opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -18,13 +18,11 @@ package org.forgerock.opendj.ldap; import java.net.InetSocketAddress; import java.util.EventListener; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLSession; import javax.security.sasl.SaslServer; import org.forgerock.opendj.ldap.requests.UnbindRequest; import org.forgerock.opendj.ldap.responses.ExtendedResult; import com.forgerock.reactive.Completable; @@ -37,62 +35,25 @@ */ public interface LDAPClientContext { /** Listens for disconnection event. */ public interface ConnectionEventListener extends EventListener { /** * Invoked when the connection has been disconnected because of an error (e.g: message too big). * Register a listener which will be notified when this {@link LDAPClientContext} changes state. * * @param context * The {@link LDAPClientContext} which has failed * @param error * The error * @param listener The {@link LDAPClientContextEventListener} to register. */ void handleConnectionError(LDAPClientContext context, Throwable error); /** * Invoked when the client closed the connection, possibly using an unbind request. * * @param context * The {@link LDAPClientContext} which has been disconnected * @param unbindRequest * The unbind request, which may be {@code null} if one was not sent before the connection was * closed. */ void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest); /** * Invoked when the connection has been disconnected by the server. * * @param context * The {@link LDAPClientContext} which has been disconnected * @param resultCode * The result code which was included with the disconnect notification, or {@code null} if no * disconnect notification was sent. * @param diagnosticMessage * The diagnostic message, which may be empty or {@code null} indicating that none was provided. */ void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage); } /** * Register a listener which will be notified when this {@link LDAPClientContext} is disconnected. * * @param listener The {@link ConnectionEventListener} to register. */ void addConnectionEventListener(ConnectionEventListener listener); void addListener(LDAPClientContextEventListener listener); /** * Disconnects the client without sending a disconnect notification. Invoking this method causes * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called * before this method returns. * {@link LDAPClientContextEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be * called before this method returns. */ void disconnect(); /** * Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic * message. Invoking this method causes * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called * before this method returns. * {@link LDAPClientContextEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be * called before this method returns. * * @param resultCode * The result code to include with the disconnect notification opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java
New file @@ -0,0 +1,58 @@ /* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2016 ForgeRock AS. */ package org.forgerock.opendj.ldap; import java.util.EventListener; import org.forgerock.opendj.ldap.requests.UnbindRequest; /** A listener interface for handling LDAPClientContext state changes. */ public interface LDAPClientContextEventListener extends EventListener { /** * Invoked when the connection has been disconnected because of an error (e.g: message too big). * * @param context * The {@link LDAPClientContext} which has failed * @param error * The error */ void handleConnectionError(LDAPClientContext context, Throwable error); /** * Invoked when the client closed the connection, possibly using an unbind request. * * @param context * The {@link LDAPClientContext} which has been disconnected * @param unbindRequest * The unbind request, which may be {@code null} if one was not sent before the connection was * closed. */ void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest); /** * Invoked when the connection has been disconnected by the server. * * @param context * The {@link LDAPClientContext} which has been disconnected * @param resultCode * The result code which was included with the disconnect notification, or {@code null} if no * disconnect notification was sent. * @param diagnosticMessage * The diagnostic message, which may be empty or {@code null} indicating that none was provided. */ void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage); } opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -41,6 +41,7 @@ import org.forgerock.opendj.ldap.DecodeException; import org.forgerock.opendj.ldap.DecodeOptions; import org.forgerock.opendj.ldap.LDAPClientContext; import org.forgerock.opendj.ldap.LDAPClientContextEventListener; import org.forgerock.opendj.ldap.LDAPListener; import org.forgerock.opendj.ldap.LdapException; import org.forgerock.opendj.ldap.ResultCode; @@ -76,7 +77,6 @@ import com.forgerock.reactive.Action; import com.forgerock.reactive.Completable; import com.forgerock.reactive.Consumer; import com.forgerock.reactive.ReactiveHandler; import com.forgerock.reactive.Stream; @@ -165,7 +165,7 @@ @Override public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception { if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) { clientContext.notifyConnectionClosedSilently(rawRequest); clientContext.notifyConnectionClosedRawUnbind(rawRequest); return emptyStream(); } Stream<Response> response; @@ -192,7 +192,7 @@ }, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() { @Override public Publisher<Void> apply(Throwable error) throws Exception { clientContext.notifyErrorSilently(error); clientContext.notifyConnectionError(error); // Swallow the error to prevent the subscribe() below to report it on the console. return emptyStream(); } @@ -306,7 +306,7 @@ private final Connection<?> connection; private volatile boolean isClosed; private final List<ConnectionEventListener> connectionEventListeners = new LinkedList<>(); private final List<LDAPClientContextEventListener> connectionEventListeners = new LinkedList<>(); private SaslServer saslServer; private GrizzlyBackpressureSubscription downstream; @@ -341,7 +341,7 @@ if (immutableRef != null) { immutableRef.onComplete(); } notifyConnectionClosedSilently(null); notifyConnectionClosedRawUnbind(null); return ctx.getStopAction(); } @@ -487,7 +487,7 @@ } @Override public void addConnectionEventListener(ConnectionEventListener listener) { public void addListener(final LDAPClientContextEventListener listener) { Reject.ifNull(listener, "listener must not be null"); synchronized (connectionEventListeners) { connectionEventListeners.add(listener); @@ -496,7 +496,7 @@ @Override public void disconnect() { notifySilentlyConnectionDisconnected(null, null); notifyConnectionDisconnected(null, null); connection.closeSilently(); } @@ -507,46 +507,41 @@ @Override public void disconnect(final ResultCode resultCode, final String diagnosticMessage) { notifySilentlyConnectionDisconnected(resultCode, diagnosticMessage); notifyConnectionDisconnected(resultCode, diagnosticMessage); sendUnsolicitedNotification( newGenericExtendedResult(resultCode) .setOID(OID_NOTICE_OF_DISCONNECTION) .setDiagnosticMessage(diagnosticMessage) ).subscribe(new Action() { ).doAfterTerminate(new Action() { @Override public void run() throws Exception { closeConnection(); } }, new Consumer<Throwable>() { @Override public void accept(final Throwable error) throws Exception { closeConnection(); } }); }).subscribe(); } private void notifyConnectionClosedSilently(final LdapRequestEnvelope unbindRequest) { private void notifyConnectionClosedRawUnbind(final LdapRequestEnvelope rawUnbindRequest) { // Close this connection context. if (unbindRequest == null) { notifySilentlyConnectionClosed(null); if (rawUnbindRequest == null) { notifyConnectionClosed(null); } else { try { LDAP.getReader(unbindRequest.getContent(), new DecodeOptions()) LDAP.getReader(rawUnbindRequest.getContent(), new DecodeOptions()) .readMessage(new AbstractLDAPMessageHandler() { @Override public void unbindRequest(int messageID, UnbindRequest unbindRequest) public void unbindRequest(final int messageID, final UnbindRequest unbindRequest) throws DecodeException, IOException { notifySilentlyConnectionClosed(unbindRequest); notifyConnectionClosed(unbindRequest); } }); } catch (Exception e) { notifySilentlyConnectionClosed(null); notifyConnectionClosed(null); } } } private void notifySilentlyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) { for (final ConnectionEventListener listener : getAndClearListeners()) { private void notifyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) { for (final LDAPClientContextEventListener listener : getAndClearListeners()) { try { listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage); } catch (Exception e) { @@ -555,8 +550,8 @@ } } private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) { for (final ConnectionEventListener listener : getAndClearListeners()) { private void notifyConnectionClosed(final UnbindRequest unbindRequest) { for (final LDAPClientContextEventListener listener : getAndClearListeners()) { try { listener.handleConnectionClosed(this, unbindRequest); } catch (Exception e) { @@ -565,8 +560,8 @@ } } private void notifyErrorSilently(final Throwable error) { for (final ConnectionEventListener listener : getAndClearListeners()) { private void notifyConnectionError(final Throwable error) { for (final LDAPClientContextEventListener listener : getAndClearListeners()) { try { listener.handleConnectionError(this, error); } catch (Exception e) { @@ -575,9 +570,9 @@ } } private ArrayList<ConnectionEventListener> getAndClearListeners() { private List<LDAPClientContextEventListener> getAndClearListeners() { synchronized (connectionEventListeners) { final ArrayList<ConnectionEventListener> listeners = new ArrayList<>(connectionEventListeners); final ArrayList<LDAPClientContextEventListener> listeners = new ArrayList<>(connectionEventListeners); connectionEventListeners.clear(); return listeners; } opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -32,16 +32,14 @@ import java.security.cert.Certificate; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLPeerUnverifiedException; @@ -56,7 +54,7 @@ import org.forgerock.opendj.ldap.ByteString; import org.forgerock.opendj.ldap.DN; import org.forgerock.opendj.ldap.LDAPClientContext; import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener; import org.forgerock.opendj.ldap.LDAPClientContextEventListener; import org.forgerock.opendj.ldap.ResultCode; import org.forgerock.opendj.ldap.requests.UnbindRequest; import org.forgerock.opendj.ldap.responses.CompareResult; @@ -64,6 +62,7 @@ import org.forgerock.opendj.ldap.responses.Responses; import org.forgerock.opendj.ldap.responses.Result; import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope; import org.forgerock.util.Reject; import org.opends.server.api.ClientConnection; import org.opends.server.api.ConnectionHandler; import org.opends.server.core.AbandonOperationBasis; @@ -107,6 +106,7 @@ import org.opends.server.types.SearchResultEntry; import org.opends.server.types.SearchResultReference; import org.opends.server.util.TimeThread; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -229,7 +229,7 @@ } connectionID = DirectoryServer.newConnectionAccepted(this); clientContext.addConnectionEventListener(new ConnectionEventListener() { clientContext.addListener(new LDAPClientContextEventListener() { @Override public void handleConnectionError(LDAPClientContext context, Throwable error) { if (error instanceof LocalizableException) { @@ -1676,17 +1676,17 @@ } /** Upstream -> BlockingBackpressureSubscription -> Downstream. */ private final class BlockingBackpressureSubscription implements Subscription, Publisher<Response>, Subscriber<Response> { private long pendingRequests; private final Queue<Response> queue = new LinkedList<>(); private final Lock lock = new ReentrantLock(); private final Condition spaceAvailable = lock.newCondition(); private final class BlockingBackpressureSubscription implements Subscription, Processor<Response, Response> { private final AtomicLong pendingRequests = new AtomicLong(); private final AtomicInteger missedDrain = new AtomicInteger(); private final BlockingQueue<Response> queue = new LinkedBlockingQueue<>(32); private final Publisher<Response> upstream; private final long writeTimeoutMillis; private boolean upstreamCompleted; private Subscription subscription; private Subscriber<? super Response> downstream; private volatile boolean done; private Throwable error; private volatile boolean cancelled; BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) { this.upstream = upstream; @@ -1697,12 +1697,14 @@ @Override public void subscribe(final Subscriber<? super Response> subscriber) { Reject.ifNull(subscriber); if (downstream != null) { // This publisher only support one subscriber. return; } downstream = subscriber; subscriber.onSubscribe(this); upstream.subscribe(this); subscriber.onSubscribe(/* Subscription */ this); upstream.subscribe(/* Subscriber */ this); } @Override @@ -1716,91 +1718,117 @@ } @Override public void request(long n) { lock.lock(); try { if (pendingRequests != Long.MIN_VALUE) { pendingRequests += n; public void request(final long n) { if (n == Long.MAX_VALUE) { pendingRequests.set(Long.MAX_VALUE); } else { // There is a known and accepted problem here regarding reactive-stream contract in the sense that // we're not supporting pendingRequests overflow (pendingRequests + n > Long.MAX_VALUE) for performance // reason since this should never happen in the context we're using it. pendingRequests.addAndGet(n); } drain(); } } finally { lock.unlock(); // Taken from // https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation private void drain() { if (missedDrain.getAndIncrement() != 0) { // Another thread is already executing this drain method. return; } int missed = 1; for (;;) { final long immutablePendingRequests = pendingRequests.get(); long emitted = 0L; while (emitted != immutablePendingRequests) { // Check if we should early exit because of cancellation if (cancelled) { return; } final Response response = queue.poll(); if (response != null) { downstream.onNext(response); emitted++; } else if (done) { // queue is empty and we received a completion (onError/onComplete) notification from upstream forwardDoneEvent(); return; } else { // Queue is empty but upstream is not done yet. break; } } private void drain() { Response response; try { while (pendingRequests > 0 && (response = queue.poll()) != null) { downstream.onNext(response); // Forward response pendingRequests--; // Check if an onError/onComplete from upstream arrived. if (emitted == immutablePendingRequests) { if (cancelled) { return; } if (upstreamCompleted && queue.isEmpty()) { if (pendingRequests != Long.MIN_VALUE) { if (done && queue.isEmpty()) { forwardDoneEvent(); return; } } if (emitted != 0) { pendingRequests.addAndGet(-emitted); } // Check to see if another thread asked for drain missed = missedDrain.addAndGet(-missed); if (missed == 0) { // Nop, we can exit. break; } } } private void forwardDoneEvent() { final Throwable immutableError = error; if (immutableError != null) { downstream.onError(immutableError); } else { downstream.onComplete(); } cancel(); } } finally { spaceAvailable.signalAll(); } } @Override public void onNext(final Response response) { lock.lock(); try { while (queue.size() >= 32) { try { if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) { if (queue.offer(response, writeTimeoutMillis, TimeUnit.MILLISECONDS)) { drain(); } else { // If we've gotten here, then the write timed out. downstream.onError(new ClosedChannelException()); cancel(); onError(new ClosedChannelException().fillInStackTrace()); return; } } catch (InterruptedException e) { downstream.onError(e); cancel(); return; } } queue.add(response); drain(); } finally { lock.unlock(); onError(e); } } @Override public void onError(Throwable t) { downstream.onError(t); cancel(); public void onError(final Throwable error) { this.error = error; done = true; drain(); } @Override public void onComplete() { lock.lock(); try { // the onComplete() will be forwarded downstream once the pending messages have been flushed upstreamCompleted = true; done = true; drain(); } finally { lock.unlock(); } } @Override public void cancel() { lock.lock(); try { pendingRequests = Long.MIN_VALUE; } finally { lock.unlock(); } if (subscription != null) { cancelled = true; subscription.cancel(); } } } } opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -53,7 +53,7 @@ import org.forgerock.opendj.ldap.AddressMask; import org.forgerock.opendj.ldap.DN; import org.forgerock.opendj.ldap.LDAPClientContext; import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener; import org.forgerock.opendj.ldap.LDAPClientContextEventListener; import org.forgerock.opendj.ldap.LDAPListener; import org.forgerock.opendj.ldap.LdapException; import org.forgerock.opendj.ldap.ResultCode; @@ -641,7 +641,7 @@ LDAPClientContext clientContext) throws LdapException { final LDAPClientConnection2 conn = canAccept(clientContext); clientConnections.add(conn); clientContext.addConnectionEventListener(new ConnectionEventListener() { clientContext.addListener(new LDAPClientContextEventListener() { @Override public void handleConnectionError(final LDAPClientContext context, final Throwable error) { clientConnections.remove(conn);