mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
21.44.2016 b59e161bd2d26df6023efea77353c8f3f61dd37b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package org.forgerock.opendj.reactive;
 
import static com.forgerock.reactive.RxJavaStreams.*;
import static org.forgerock.util.Utils.closeSilently;
 
import java.util.concurrent.atomic.AtomicReference;
 
import org.forgerock.opendj.ldap.Connection;
import org.forgerock.opendj.ldap.IntermediateResponseHandler;
import org.forgerock.opendj.ldap.LDAPConnectionFactory;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.SearchResultHandler;
import org.forgerock.opendj.ldap.requests.CompareRequest;
import org.forgerock.opendj.ldap.requests.ExtendedRequest;
import org.forgerock.opendj.ldap.requests.Request;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.responses.IntermediateResponse;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
import org.forgerock.opendj.ldif.ChangeRecord;
import org.forgerock.util.promise.ExceptionHandler;
import org.forgerock.util.promise.ResultHandler;
import org.forgerock.util.promise.RuntimeExceptionHandler;
 
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
 
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableEmitter.BackpressureMode;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.functions.Action;
 
/** Forward {@link Request} to another server reached through the {@link LDAPConnectionFactory}. */
final class ProxyToHandler implements ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> {
 
    private final LDAPConnectionFactory connectionFactory;
 
    ProxyToHandler(final LDAPConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }
 
    @Override
    public Single<Stream<Response>> handle(final LDAPClientConnection2 context, final Request request) {
        return singleFromPublisher(Flowable.create(new FlowableOnSubscribe<Stream<Response>>() {
            @Override
            public void subscribe(final FlowableEmitter<Stream<Response>> emitter) throws Exception {
                final AtomicReference<Connection> connectionHolder = new AtomicReference<Connection>();
 
                connectionFactory.getConnectionAsync().thenOnResult(new ResultHandler<Connection>() {
                    @Override
                    public void handleResult(final Connection connection) {
                        connectionHolder.set(connection);
                        emitter.onNext(executeRequest(connection, request));
                        emitter.onComplete();
                    }
                }).thenOnException(new ExceptionHandler<LdapException>() {
                    @Override
                    public void handleException(LdapException exception) {
                        emitter.onError(exception);
                    }
                }).thenOnRuntimeException(new RuntimeExceptionHandler() {
                    @Override
                    public void handleRuntimeException(RuntimeException exception) {
                        emitter.onError(exception);
                    }
                });
            }
        }, BackpressureMode.ERROR));
    }
 
    private Stream<Response> executeRequest(final Connection connection, final Request request) {
        return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
            @Override
            public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                // add/delete/modifyDN/modifyReq
                final PublisherAdaptor adapter = new PublisherAdaptor(emitter);
                if (request instanceof ChangeRecord) {
                    connection.applyChangeAsync((ChangeRecord) request, adapter).thenOnResult(adapter)
                            .thenOnException(adapter).thenOnRuntimeException(adapter);
                } else if (request instanceof CompareRequest) {
                    connection.compareAsync((CompareRequest) request, adapter).thenOnResult(adapter)
                            .thenOnException(adapter).thenOnRuntimeException(adapter);
                } else if (request instanceof ExtendedRequest) {
                    connection.compareAsync((CompareRequest) request, adapter).thenOnResult(adapter)
                            .thenOnException(adapter).thenOnRuntimeException(adapter);
                } else if (request instanceof SearchRequest) {
                    connection.searchAsync((SearchRequest) request, adapter).thenOnResult(adapter)
                            .thenOnException(adapter).thenOnRuntimeException(adapter);
                } else {
                    emitter.onError(new IllegalArgumentException("Unsupported request type"));
                }
            }
        }, BackpressureMode.ERROR).doAfterTerminate(new Action() {
            @Override
            public void run() throws Exception {
                closeSilently(connection);
            }
        }));
    }
 
    /** Adaptor forwarding events from the SDK handler to the emitter. */
    private final class PublisherAdaptor implements IntermediateResponseHandler, SearchResultHandler,
            ResultHandler<Result>, ExceptionHandler<Exception>, RuntimeExceptionHandler {
 
        private final FlowableEmitter<Response> emitter;
 
        PublisherAdaptor(final FlowableEmitter<Response> emitter) {
            this.emitter = emitter;
        }
 
        private boolean handle(final Response response) {
            emitter.onNext(response);
            return true;
        }
 
        @Override
        public boolean handleEntry(SearchResultEntry entry) {
            return handle(entry);
        }
 
        @Override
        public boolean handleReference(SearchResultReference reference) {
            return handle(reference);
        }
 
        @Override
        public boolean handleIntermediateResponse(IntermediateResponse response) {
            return handle(response);
        }
 
        @Override
        public void handleResult(Result result) {
            if (result != null) {
                handle(result);
            }
            emitter.onComplete();
        }
 
        @Override
        public void handleRuntimeException(RuntimeException exception) {
            emitter.onError(exception);
        }
 
        @Override
        public void handleException(Exception exception) {
            emitter.onError(exception);
        }
    }
}