| | |
| | | import java.security.cert.Certificate; |
| | | import java.util.Collection; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Queue; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | import java.util.concurrent.locks.Condition; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | import javax.net.ssl.SSLEngine; |
| | | import javax.net.ssl.SSLPeerUnverifiedException; |
| | |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.DN; |
| | | import org.forgerock.opendj.ldap.LDAPClientContext; |
| | | import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener; |
| | | import org.forgerock.opendj.ldap.LDAPClientContextEventListener; |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | import org.forgerock.opendj.ldap.requests.UnbindRequest; |
| | | import org.forgerock.opendj.ldap.responses.CompareResult; |
| | |
| | | import org.forgerock.opendj.ldap.responses.Responses; |
| | | import org.forgerock.opendj.ldap.responses.Result; |
| | | import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope; |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.server.api.ClientConnection; |
| | | import org.opends.server.api.ConnectionHandler; |
| | | import org.opends.server.core.AbandonOperationBasis; |
| | |
| | | import org.opends.server.types.SearchResultEntry; |
| | | import org.opends.server.types.SearchResultReference; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.reactivestreams.Processor; |
| | | import org.reactivestreams.Publisher; |
| | | import org.reactivestreams.Subscriber; |
| | | import org.reactivestreams.Subscription; |
| | |
| | | } |
| | | |
| | | connectionID = DirectoryServer.newConnectionAccepted(this); |
| | | clientContext.addConnectionEventListener(new ConnectionEventListener() { |
| | | clientContext.addListener(new LDAPClientContextEventListener() { |
| | | @Override |
| | | public void handleConnectionError(LDAPClientContext context, Throwable error) { |
| | | if (error instanceof LocalizableException) { |
| | |
| | | } |
| | | |
| | | /** Upstream -> BlockingBackpressureSubscription -> Downstream. */ |
| | | private final class BlockingBackpressureSubscription |
| | | implements Subscription, Publisher<Response>, Subscriber<Response> { |
| | | private long pendingRequests; |
| | | private final Queue<Response> queue = new LinkedList<>(); |
| | | private final Lock lock = new ReentrantLock(); |
| | | private final Condition spaceAvailable = lock.newCondition(); |
| | | private final class BlockingBackpressureSubscription implements Subscription, Processor<Response, Response> { |
| | | private final AtomicLong pendingRequests = new AtomicLong(); |
| | | private final AtomicInteger missedDrain = new AtomicInteger(); |
| | | private final BlockingQueue<Response> queue = new LinkedBlockingQueue<>(32); |
| | | private final Publisher<Response> upstream; |
| | | private final long writeTimeoutMillis; |
| | | private boolean upstreamCompleted; |
| | | private Subscription subscription; |
| | | private Subscriber<? super Response> downstream; |
| | | private volatile boolean done; |
| | | private Throwable error; |
| | | private volatile boolean cancelled; |
| | | |
| | | BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) { |
| | | this.upstream = upstream; |
| | |
| | | |
| | | @Override |
| | | public void subscribe(final Subscriber<? super Response> subscriber) { |
| | | Reject.ifNull(subscriber); |
| | | if (downstream != null) { |
| | | // This publisher only support one subscriber. |
| | | return; |
| | | } |
| | | downstream = subscriber; |
| | | subscriber.onSubscribe(this); |
| | | upstream.subscribe(this); |
| | | subscriber.onSubscribe(/* Subscription */ this); |
| | | upstream.subscribe(/* Subscriber */ this); |
| | | } |
| | | |
| | | @Override |
| | | public void onSubscribe(final Subscription s) { |
| | | if ( subscription != null) { |
| | | if (subscription != null) { |
| | | s.cancel(); |
| | | return; |
| | | } |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void request(long n) { |
| | | lock.lock(); |
| | | try { |
| | | if (pendingRequests != Long.MIN_VALUE) { |
| | | pendingRequests += n; |
| | | drain(); |
| | | public void request(final long n) { |
| | | if (n == Long.MAX_VALUE) { |
| | | pendingRequests.set(Long.MAX_VALUE); |
| | | } else { |
| | | // There is a known and accepted problem here regarding reactive-stream contract in the sense that |
| | | // we're not supporting pendingRequests overflow (pendingRequests + n > Long.MAX_VALUE) for performance |
| | | // reason since this should never happen in the context we're using it. |
| | | pendingRequests.addAndGet(n); |
| | | } |
| | | drain(); |
| | | } |
| | | |
| | | // Taken from |
| | | // https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation |
| | | private void drain() { |
| | | if (missedDrain.getAndIncrement() != 0) { |
| | | // Another thread is already executing this drain method. |
| | | return; |
| | | } |
| | | |
| | | int missed = 1; |
| | | |
| | | for (;;) { |
| | | final long immutablePendingRequests = pendingRequests.get(); |
| | | long emitted = 0L; |
| | | while (emitted != immutablePendingRequests) { |
| | | // Check if we should early exit because of cancellation |
| | | if (cancelled) { |
| | | return; |
| | | } |
| | | |
| | | final Response response = queue.poll(); |
| | | if (response != null) { |
| | | downstream.onNext(response); |
| | | emitted++; |
| | | } else if (done) { |
| | | // queue is empty and we received a completion (onError/onComplete) notification from upstream |
| | | forwardDoneEvent(); |
| | | return; |
| | | } else { |
| | | // Queue is empty but upstream is not done yet. |
| | | break; |
| | | } |
| | | } |
| | | } finally { |
| | | lock.unlock(); |
| | | |
| | | // Check if an onError/onComplete from upstream arrived. |
| | | if (emitted == immutablePendingRequests) { |
| | | if (cancelled) { |
| | | return; |
| | | } |
| | | |
| | | if (done && queue.isEmpty()) { |
| | | forwardDoneEvent(); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | if (emitted != 0) { |
| | | pendingRequests.addAndGet(-emitted); |
| | | } |
| | | |
| | | // Check to see if another thread asked for drain |
| | | missed = missedDrain.addAndGet(-missed); |
| | | if (missed == 0) { |
| | | // Nop, we can exit. |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void drain() { |
| | | Response response; |
| | | try { |
| | | while (pendingRequests > 0 && (response = queue.poll()) != null) { |
| | | downstream.onNext(response); |
| | | // Forward response |
| | | pendingRequests--; |
| | | } |
| | | if (upstreamCompleted && queue.isEmpty()) { |
| | | if (pendingRequests != Long.MIN_VALUE) { |
| | | downstream.onComplete(); |
| | | } |
| | | cancel(); |
| | | } |
| | | } finally { |
| | | spaceAvailable.signalAll(); |
| | | private void forwardDoneEvent() { |
| | | final Throwable immutableError = error; |
| | | if (immutableError != null) { |
| | | downstream.onError(immutableError); |
| | | } else { |
| | | downstream.onComplete(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onNext(final Response response) { |
| | | lock.lock(); |
| | | try { |
| | | while (queue.size() >= 32) { |
| | | try { |
| | | if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) { |
| | | // If we've gotten here, then the write timed out. |
| | | downstream.onError(new ClosedChannelException()); |
| | | cancel(); |
| | | return; |
| | | } |
| | | } catch (InterruptedException e) { |
| | | downstream.onError(e); |
| | | cancel(); |
| | | return; |
| | | } |
| | | if (queue.offer(response, writeTimeoutMillis, TimeUnit.MILLISECONDS)) { |
| | | drain(); |
| | | } else { |
| | | // If we've gotten here, then the write timed out. |
| | | onError(new ClosedChannelException().fillInStackTrace()); |
| | | return; |
| | | } |
| | | queue.add(response); |
| | | drain(); |
| | | } finally { |
| | | lock.unlock(); |
| | | } catch (InterruptedException e) { |
| | | onError(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onError(Throwable t) { |
| | | downstream.onError(t); |
| | | cancel(); |
| | | public void onError(final Throwable error) { |
| | | this.error = error; |
| | | done = true; |
| | | drain(); |
| | | } |
| | | |
| | | @Override |
| | | public void onComplete() { |
| | | lock.lock(); |
| | | try { |
| | | // the onComplete() will be forwarded downstream once the pending messages have been flushed |
| | | upstreamCompleted = true; |
| | | drain(); |
| | | } finally { |
| | | lock.unlock(); |
| | | } |
| | | done = true; |
| | | drain(); |
| | | } |
| | | |
| | | @Override |
| | | public void cancel() { |
| | | lock.lock(); |
| | | try { |
| | | pendingRequests = Long.MIN_VALUE; |
| | | } finally { |
| | | lock.unlock(); |
| | | } |
| | | if (subscription != null) { |
| | | subscription.cancel(); |
| | | } |
| | | cancelled = true; |
| | | subscription.cancel(); |
| | | } |
| | | } |
| | | } |