mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
14.51.2016 5e424f9a2e8e3e48a898b02ed85cb6fe9cc1fd35
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

Fix: force blocking mode for client socket
3 files modified
69 ■■■■ changed files
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java 3 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 59 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java 7 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java
@@ -12,7 +12,7 @@
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2010 Sun Microsystems, Inc.
 * Portions Copyright 2011-2015 ForgeRock AS.
 * Portions Copyright 2011-2016 ForgeRock AS.
 */
package org.forgerock.opendj.grizzly;
@@ -105,6 +105,7 @@
        private GrizzlyLDAPConnection adaptConnection(final Connection<?> connection) {
            configureConnection(connection, logger, options);
            connection.configureBlocking(true);
            final GrizzlyLDAPConnection ldapConnection =
                    new GrizzlyLDAPConnection(connection, GrizzlyLDAPConnectionFactory.this);
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -238,58 +238,59 @@
        final class GrizzlyBackpressureSubscription implements Subscription {
            private final AtomicLong pendingRequests = new AtomicLong();
            private Subscriber<? super LdapRequestEnvelope> subscriber;
            volatile FilterChainContext ctx;
            private final Subscriber<? super LdapRequestEnvelope> subscriber;
            private FilterChainContext suspendedCtx;
            GrizzlyBackpressureSubscription(Subscriber<? super LdapRequestEnvelope> subscriber) {
            GrizzlyBackpressureSubscription(final Subscriber<? super LdapRequestEnvelope> subscriber) {
                this.subscriber = subscriber;
                subscriber.onSubscribe(this);
            }
            NextAction handleRead(final FilterChainContext ctx) {
                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
                if (sub == null) {
                    // Subscription cancelled. Stop reading
                    ctx.suspend();
                    return ctx.getSuspendAction();
                if (pendingRequests.get() == 1) {
                    subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
                    synchronized (this) {
                        if (BackpressureHelper.producedCancel(pendingRequests, 1) == 0) {
                            this.suspendedCtx = ctx;
                            ctx.suspend();
                            return ctx.getSuspendAction();
                        }
                    }
                } else if (BackpressureHelper.producedCancel(pendingRequests, 1) >= 0) {
                    subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
                }
                sub.onNext((LdapRequestEnvelope) ctx.getMessage());
                if (BackpressureHelper.produced(pendingRequests, 1) > 0) {
                    return ctx.getStopAction();
                }
                this.ctx = ctx;
                ctx.suspend();
                return ctx.getSuspendAction();
                return ctx.getStopAction();
            }
            @Override
            public void request(long n) {
                final FilterChainContext immutableRef = ctx;
                if (BackpressureHelper.add(pendingRequests, n) == 0 && immutableRef != null) {
                    immutableRef.resumeNext();
                    ctx = null;
            public void request(final long n) {
                if (pendingRequests.get() == 0) {
                    synchronized (this) {
                        if (BackpressureHelper.addCancel(pendingRequests, n) == 0) {
                            suspendedCtx.resume();
                            suspendedCtx = null;
                            return;
                        }
                    }
                }
                BackpressureHelper.addCancel(pendingRequests, n);
            }
            public void onError(final Throwable error) {
                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
                if (sub != null) {
                    subscriber = null;
                    sub.onError(error);
                if (pendingRequests.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
                    subscriber.onError(error);
                }
            }
            public void onComplete() {
                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
                if (sub != null) {
                    subscriber = null;
                    sub.onComplete();
                if (pendingRequests.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
                    subscriber.onComplete();
                }
            }
            @Override
            public void cancel() {
                subscriber = null;
                pendingRequests.set(Long.MIN_VALUE);
            }
        }
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -55,10 +55,7 @@
    @Override
    public void completed(final Object result) {
        final Subscription sub = upstream;
        if (sub != null) {
            sub.request(1);
        }
        upstream.request(1);
    }
    @Override
@@ -79,14 +76,12 @@
    @Override
    public void onError(final Throwable error) {
        upstream.cancel();
        upstream = null;
        downstream.onError(error);
    }
    @Override
    public void onComplete() {
        upstream.cancel();
        upstream = null;
        downstream.onComplete();
    }
}