mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
14.26.2016 376465285de63517e1fad3d38cf671c7f2632221
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

Fix a theoretical (not reproducible) race-condition which could make a
server connection becoming deaf to incoming packet.
1 files modified
24 ■■■■■ changed files
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 24 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -249,31 +249,39 @@
            NextAction handleRead(final FilterChainContext ctx) {
                if (pendingRequests.get() == 1) {
                    subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
                    // This synchronized ensure that the context suspend is done atomically with the fact that
                    // pendingRequests is set to 0
                    synchronized (this) {
                        if (BackpressureHelper.producedCancel(pendingRequests, 1) == 0) {
                            this.suspendedCtx = ctx;
                        // Another request() might have add some pendingRequests in the mean time
                        if (pendingRequests.compareAndSet(1, 0)) {
                            ctx.suspend();
                            this.suspendedCtx = ctx;
                            return ctx.getSuspendAction();
                        }
                    }
                } else if (BackpressureHelper.producedCancel(pendingRequests, 1) >= 0) {
                    subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
                    return ctx.getStopAction();
                }
                if (BackpressureHelper.producedCancel(pendingRequests, 1) == Long.MIN_VALUE) {
                    ctx.suspend();
                    return ctx.getSuspendAction();
                }
                subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
                return ctx.getStopAction();
            }
            @Override
            public void request(final long n) {
                if (pendingRequests.get() == 0) {
                if (BackpressureHelper.addCancel(pendingRequests, n) == 0) {
                    // This synchronized, coupled to the previous one, ensure the atomicity by "waiting" until the
                    // context has been suspended
                    synchronized (this) {
                        if (BackpressureHelper.addCancel(pendingRequests, n) == 0) {
                        // On startup, pendingRequests = 0 and suspendedCtx is null
                        if (suspendedCtx != null) {
                            suspendedCtx.resume();
                            suspendedCtx = null;
                            return;
                        }
                    }
                }
                BackpressureHelper.addCancel(pendingRequests, n);
            }
            public void onError(final Throwable error) {