| | |
| | | import static com.forgerock.reactive.RxJavaStreams.*; |
| | | import static org.forgerock.opendj.grizzly.GrizzlyUtils.configureConnection; |
| | | import static org.forgerock.opendj.io.LDAP.*; |
| | | import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult; |
| | | import static org.forgerock.opendj.ldap.spi.LdapMessages.newResponseMessage; |
| | | |
| | | import java.io.IOException; |
| | | import java.net.InetSocketAddress; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.CancellationException; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | | import javax.net.ssl.SSLEngine; |
| | |
| | | import org.forgerock.opendj.ldap.responses.ExtendedResult; |
| | | import org.forgerock.opendj.ldap.responses.IntermediateResponse; |
| | | import org.forgerock.opendj.ldap.responses.Response; |
| | | import org.forgerock.opendj.ldap.responses.Responses; |
| | | import org.forgerock.opendj.ldap.responses.Result; |
| | | import org.forgerock.opendj.ldap.responses.SearchResultEntry; |
| | | import org.forgerock.opendj.ldap.responses.SearchResultReference; |
| | |
| | | import org.forgerock.util.Function; |
| | | import org.forgerock.util.Options; |
| | | import org.forgerock.util.Reject; |
| | | import org.forgerock.util.promise.ExceptionHandler; |
| | | import org.forgerock.util.promise.PromiseImpl; |
| | | import org.forgerock.util.promise.ResultHandler; |
| | | import org.forgerock.util.promise.RuntimeExceptionHandler; |
| | | import org.glassfish.grizzly.Connection; |
| | | import org.glassfish.grizzly.EmptyCompletionHandler; |
| | | import org.glassfish.grizzly.filterchain.BaseFilter; |
| | | import org.glassfish.grizzly.filterchain.Filter; |
| | | import org.glassfish.grizzly.filterchain.FilterChain; |
| | |
| | | |
| | | import com.forgerock.reactive.Action; |
| | | import com.forgerock.reactive.Completable; |
| | | import com.forgerock.reactive.Consumer; |
| | | import com.forgerock.reactive.ReactiveHandler; |
| | | import com.forgerock.reactive.Stream; |
| | | |
| | |
| | | @Override |
| | | public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception { |
| | | if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) { |
| | | clientContext.notifyConnectionClosed(rawRequest); |
| | | clientContext.notifyConnectionClosedSilently(rawRequest); |
| | | return emptyStream(); |
| | | } |
| | | Stream<Response> response; |
| | |
| | | }, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() { |
| | | @Override |
| | | public Publisher<Void> apply(Throwable error) throws Exception { |
| | | clientContext.notifyErrorAndCloseSilently(error); |
| | | clientContext.notifyErrorSilently(error); |
| | | // Swallow the error to prevent the subscribe() below to report it on the console. |
| | | return emptyStream(); |
| | | } |
| | | }).onComplete(new Action() { |
| | | @Override |
| | | public void run() throws Exception { |
| | | clientContext.notifyConnectionClosed(null); |
| | | connection.closeSilently(); |
| | | } |
| | | }).subscribe(); |
| | | return ctx.getStopAction(); |
| | |
| | | } |
| | | |
| | | private final Connection<?> connection; |
| | | private final AtomicBoolean isClosed = new AtomicBoolean(false); |
| | | private final List<ConnectionEventListener> listeners = new LinkedList<>(); |
| | | private volatile boolean isClosed; |
| | | private final List<ConnectionEventListener> connectionEventListeners = new LinkedList<>(); |
| | | private SaslServer saslServer; |
| | | private GrizzlyBackpressureSubscription downstream; |
| | | |
| | |
| | | |
| | | @Override |
| | | public NextAction handleClose(final FilterChainContext ctx) { |
| | | if (isClosed.compareAndSet(false, true)) { |
| | | isClosed = true; |
| | | final GrizzlyBackpressureSubscription immutableRef = downstream; |
| | | if (immutableRef != null) { |
| | | downstream.onComplete(); |
| | | immutableRef.onComplete(); |
| | | } |
| | | } |
| | | notifyConnectionClosedSilently(null); |
| | | return ctx.getStopAction(); |
| | | } |
| | | |
| | |
| | | @Override |
| | | public void addConnectionEventListener(ConnectionEventListener listener) { |
| | | Reject.ifNull(listener, "listener must not be null"); |
| | | listeners.add(listener); |
| | | synchronized (connectionEventListeners) { |
| | | connectionEventListeners.add(listener); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void disconnect() { |
| | | if (isClosed.compareAndSet(false, true)) { |
| | | try { |
| | | for (ConnectionEventListener listener : listeners) { |
| | | listener.handleConnectionDisconnected(this, null, null); |
| | | } |
| | | } finally { |
| | | closeConnection(); |
| | | } |
| | | } |
| | | notifySilentlyConnectionDisconnected(null, null); |
| | | connection.closeSilently(); |
| | | } |
| | | |
| | | private void closeConnection() { |
| | | if (downstream != null) { |
| | | downstream.cancel(); |
| | | downstream = null; |
| | | } |
| | | connection.closeSilently(); |
| | | } |
| | | |
| | | @Override |
| | | public void disconnect(final ResultCode resultCode, final String diagnosticMessage) { |
| | | // Close this connection context. |
| | | if (isClosed.compareAndSet(false, true)) { |
| | | sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode) |
| | | notifySilentlyConnectionDisconnected(resultCode, diagnosticMessage); |
| | | sendUnsolicitedNotification( |
| | | newGenericExtendedResult(resultCode) |
| | | .setOID(OID_NOTICE_OF_DISCONNECTION) |
| | | .setDiagnosticMessage(diagnosticMessage)); |
| | | try { |
| | | for (ConnectionEventListener listener : listeners) { |
| | | listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage); |
| | | } |
| | | } finally { |
| | | .setDiagnosticMessage(diagnosticMessage) |
| | | ).subscribe(new Action() { |
| | | @Override |
| | | public void run() throws Exception { |
| | | closeConnection(); |
| | | } |
| | | }, new Consumer<Throwable>() { |
| | | @Override |
| | | public void accept(final Throwable error) throws Exception { |
| | | closeConnection(); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | private void notifyConnectionClosed(final LdapRequestEnvelope unbindRequest) { |
| | | private void notifyConnectionClosedSilently(final LdapRequestEnvelope unbindRequest) { |
| | | // Close this connection context. |
| | | if (isClosed.compareAndSet(false, true)) { |
| | | if (unbindRequest == null) { |
| | | notifySilentlyConnectionClosed(null); |
| | | } else { |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void notifySilentlyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) { |
| | | for (final ConnectionEventListener listener : getAndClearListeners()) { |
| | | try { |
| | | listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage); |
| | | } catch (Exception e) { |
| | | logger.traceException(e); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) { |
| | | try { |
| | | for (final ConnectionEventListener listener : listeners) { |
| | | for (final ConnectionEventListener listener : getAndClearListeners()) { |
| | | try { |
| | | listener.handleConnectionClosed(this, unbindRequest); |
| | | } catch (Exception e) { |
| | | logger.traceException(e); |
| | | } |
| | | } |
| | | } finally { |
| | | closeConnection(); |
| | | } |
| | | } |
| | | |
| | | private void notifyErrorAndCloseSilently(final Throwable error) { |
| | | // Close this connection context. |
| | | if (isClosed.compareAndSet(false, true)) { |
| | | try { |
| | | for (final ConnectionEventListener listener : listeners) { |
| | | private void notifyErrorSilently(final Throwable error) { |
| | | for (final ConnectionEventListener listener : getAndClearListeners()) { |
| | | try { |
| | | listener.handleConnectionError(this, error); |
| | | } catch (Exception e) { |
| | | logger.traceException(e); |
| | | } |
| | | } |
| | | } finally { |
| | | closeConnection(); |
| | | } |
| | | |
| | | private ArrayList<ConnectionEventListener> getAndClearListeners() { |
| | | synchronized (connectionEventListeners) { |
| | | final ArrayList<ConnectionEventListener> listeners = new ArrayList<>(connectionEventListeners); |
| | | connectionEventListeners.clear(); |
| | | return listeners; |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean isClosed() { |
| | | return isClosed.get(); |
| | | return isClosed; |
| | | } |
| | | |
| | | @SuppressWarnings({ "unchecked", "rawtypes" }) |
| | | @Override |
| | | public Completable sendUnsolicitedNotification(final ExtendedResult notification) { |
| | | // We use a promise so that the notification is sent even if the Completable is not subscribed. |
| | | final PromiseImpl<Boolean, Exception> promise = PromiseImpl.create(); |
| | | connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification), |
| | | new EmptyCompletionHandler() { |
| | | @Override |
| | | public void cancelled() { |
| | | promise.handleException(new CancellationException()); |
| | | } |
| | | |
| | | @Override |
| | | public void sendUnsolicitedNotification(final ExtendedResult notification) { |
| | | connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification)); |
| | | public void failed(Throwable throwable) { |
| | | if (throwable instanceof Exception) { |
| | | promise.handleException((Exception) throwable); |
| | | } else { |
| | | promise.handleException(new Exception(throwable)); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void completed(Object result) { |
| | | promise.handleResult(Boolean.TRUE); |
| | | } |
| | | }); |
| | | return newCompletable(new Completable.Emitter() { |
| | | @Override |
| | | public void subscribe(final Completable.Subscriber s) throws Exception { |
| | | promise.thenOnResult(new ResultHandler<Boolean>() { |
| | | @Override |
| | | public void handleResult(Boolean result) { |
| | | s.onComplete(); |
| | | } |
| | | }).thenOnException(new ExceptionHandler<Exception>() { |
| | | @Override |
| | | public void handleException(Exception exception) { |
| | | s.onError(exception); |
| | | } |
| | | }).thenOnRuntimeException(new RuntimeExceptionHandler() { |
| | | @Override |
| | | public void handleRuntimeException(RuntimeException exception) { |
| | | s.onError(exception); |
| | | } |
| | | }); |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | } |