From 464f6dd52a1eeeef0f7b38d4dc1840501818a36f Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
/dev/null | 798 --------
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java | 1
opendj-grizzly/pom.xml | 4
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java | 25
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java | 4
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java | 1742 ++++++++---------
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 3273 +++++++++++++++------------------
7 files changed, 2,279 insertions(+), 3,568 deletions(-)
diff --git a/opendj-grizzly/pom.xml b/opendj-grizzly/pom.xml
index 85d9a2d..90333fb 100644
--- a/opendj-grizzly/pom.xml
+++ b/opendj-grizzly/pom.xml
@@ -69,10 +69,6 @@
<artifactId>forgerock-build-tools</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.forgerock.http</groupId>
- <artifactId>chf-http-core</artifactId>
- </dependency>
</dependencies>
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
index b58139b..a1a7c86 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -118,7 +118,6 @@
default:
rawDn = null;
protocolVersion = -1;
-
}
return LdapMessages.newRawMessage(messageType, messageId, protocolVersion, rawDn,
rawDn != null ? decodeOptions.getSchemaResolver().resolveSchema(rawDn) : null, reader);
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java
index f188de3..90734d2 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java
@@ -25,6 +25,7 @@
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.memory.Buffers;
+import org.glassfish.grizzly.memory.BuffersBuffer;
import org.glassfish.grizzly.memory.HeapMemoryManager;
final class SaslFilter extends BaseFilter {
@@ -82,17 +83,25 @@
private Buffer wrap(final FilterChainContext ctx, final Buffer buffer) throws SaslException {
final SaslServer server = SaslUtils.getSaslServer(ctx.getConnection());
+ final Buffer contentBuffer;
if (buffer.hasArray()) {
- return Buffers.wrap(ctx.getMemoryManager(),
+ contentBuffer = Buffers.wrap(ctx.getMemoryManager(),
server.wrap(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()));
- }
- final Buffer heapBuffer = toHeapBuffer(buffer, buffer.remaining());
- try {
- return Buffers.wrap(ctx.getMemoryManager(), server.wrap(heapBuffer.array(),
- heapBuffer.arrayOffset() + heapBuffer.position(), heapBuffer.remaining()));
+ } else {
+ final Buffer heapBuffer = toHeapBuffer(buffer, buffer.remaining());
+ try {
+ contentBuffer = Buffers.wrap(
+ ctx.getMemoryManager(),
+ server.wrap(heapBuffer.array(), heapBuffer.arrayOffset() + heapBuffer.position(),
+ heapBuffer.remaining()));
- } finally {
- heapBuffer.dispose();
+ } finally {
+ heapBuffer.dispose();
+ }
}
+ final Buffer headerBuffer = ctx.getMemoryManager().allocate(4);
+ headerBuffer.putInt(contentBuffer.limit());
+ headerBuffer.flip();
+ return BuffersBuffer.create(ctx.getMemoryManager(), headerBuffer, contentBuffer);
}
}
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
index d41fd1c..73c026e 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
@@ -139,7 +139,7 @@
// Use custom search request.
SearchRequest request = Requests.newSearchRequest(
"uid=user.0,ou=people,o=test", SearchScope.BASE_OBJECT, "(objectclass=*)", "cn");
-//
+
InetSocketAddress serverAddress = getServerSocketAddress();
factories[0][0] = new LDAPConnectionFactory(serverAddress.getHostName(),
serverAddress.getPort(),
@@ -246,7 +246,7 @@
*
* @throws Exception
*/
- @Test(dataProvider = "connectionFactories") //, timeOut = TEST_TIMEOUT_MS)
+ @Test(dataProvider = "connectionFactories", timeOut = TEST_TIMEOUT_MS)
public void testBlockingPromiseNoHandler(ConnectionFactory factory) throws Exception {
final Promise<? extends Connection, LdapException> promise = factory.getConnectionAsync();
final Connection con = promise.get();
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
deleted file mode 100644
index 14db1c6..0000000
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * The contents of this file are subject to the terms of the Common Development and
- * Distribution License (the License). You may not use this file except in compliance with the
- * License.
- *
- * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
- * specific language governing permission and limitations under the License.
- *
- * When distributing Covered Software, include this CDDL Header Notice in each file and include
- * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
- * Header, with the fields enclosed by brackets [] replaced by your own identifying
- * information: "Portions Copyright [year] [name of copyright owner]".
- *
- * Copyright 2016 ForgeRock AS.
- */
-package org.forgerock.opendj.reactive;
-
-import static com.forgerock.reactive.RxJavaStreams.*;
-import static org.forgerock.util.Reject.checkNotNull;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-
-import org.forgerock.opendj.io.AbstractLDAPMessageHandler;
-import org.forgerock.opendj.io.LDAP;
-import org.forgerock.opendj.ldap.DecodeException;
-import org.forgerock.opendj.ldap.DecodeOptions;
-import org.forgerock.opendj.ldap.LDAPConnectionFactory;
-import org.forgerock.opendj.ldap.requests.AbandonRequest;
-import org.forgerock.opendj.ldap.requests.AddRequest;
-import org.forgerock.opendj.ldap.requests.GenericBindRequest;
-import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
-import org.forgerock.opendj.ldap.requests.ModifyRequest;
-import org.forgerock.opendj.ldap.requests.Request;
-import org.forgerock.opendj.ldap.requests.SearchRequest;
-import org.forgerock.opendj.ldap.requests.UnbindRequest;
-import org.forgerock.opendj.ldap.responses.Response;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
-import org.forgerock.util.Function;
-import org.reactivestreams.Publisher;
-
-import com.forgerock.reactive.ReactiveFilter;
-import com.forgerock.reactive.ReactiveFilter.SimpleReactiveFilter;
-import com.forgerock.reactive.ReactiveHandler;
-import com.forgerock.reactive.RxJavaStreams;
-import com.forgerock.reactive.Single;
-import com.forgerock.reactive.Stream;
-
-import io.reactivex.Flowable;
-import io.reactivex.Scheduler;
-import io.reactivex.schedulers.Schedulers;
-
-/** Maintains a set of standard {@link ReactiveHandler} / {@link ReactiveFilter} which can be used in ldap endpoint. */
-public final class Components {
-
- /**
- * Routes incoming request to dedicated handler.
- *
- * @param <CTX>
- * Context type in which request are processed
- * @param <REQ>
- * Type of routed request
- * @param <REP>
- * Type of routed response
- * @param routes
- * {@link Route} where request will be forwarded to
- * @param defaultRoute
- * If no route can be applied for a specific request, it'll be forwarded to the defaultRoute
- * @return A new {@link ReactiveHandler} routing incoming requests
- */
- public static <CTX, REQ, REP> ReactiveHandler<CTX, REQ, REP> routeTo(final Iterable<Route<CTX, REQ, REP>> routes,
- final ReactiveHandler<CTX, REQ, REP> defaultRoute) {
- /** Routes requests. */
- final class RouterHandler implements ReactiveHandler<CTX, REQ, REP> {
- @Override
- public Single<REP> handle(final CTX context, final REQ request) throws Exception {
- for (final Route<CTX, REQ, REP> route : routes) {
- if (route.predicate.matches(request, context)) {
- return route.handler.handle(context, request);
- }
- }
- return defaultRoute.handle(context, request);
- }
- }
- return new RouterHandler();
- }
-
- /**
- * Allows to transform all aspects of a {@link Request}.
- *
- * @param <CTX>
- * Context type in which request are processed
- * @param requestTransformer
- * Function in charge of performing the {@link Request} transformation
- * @param responseTransformer
- * Function in charge of performing the {@link Response} transformation
- * @return A new policy performing the {@link Request} and {@link Response} transformation.
- */
- public static <CTX> SimpleReactiveFilter<CTX, Request, Stream<Response>> transform(
- final Function<Request, Request, Exception> requestTransformer,
- final Function<Response, Response, Exception> responseTransformer) {
- /** Transforms {@link Request} and {@link Response}. */
- final class TransformFilter extends SimpleReactiveFilter<CTX, Request, Stream<Response>> {
- @Override
- public Single<Stream<Response>> filter(final CTX context, final Request request,
- final ReactiveHandler<CTX, Request, Stream<Response>> next) throws Exception {
- return next.handle(context, requestTransformer.apply(request))
- .map(new Function<Stream<Response>, Stream<Response>, Exception>() {
- @Override
- public Stream<Response> apply(Stream<Response> responses) throws Exception {
- return responses.map(new Function<Response, Response, Exception>() {
- @Override
- public Response apply(Response value) throws Exception {
- return responseTransformer.apply(value);
- }
- });
- }
- });
- }
- }
- return new TransformFilter();
- }
-
- /**
- * Forward {@link Request} to the provided {@link LDAPConnectionFactory}.
- *
- * @param connectionFactory
- * The {@link LDAPConnectionFactory} used to send the forwarded {@link Request}
- * @return a {@link ReactiveHandler} Forwarding {@link Request} to the {@link LDAPConnectionFactory}
- */
- public static ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> proxyTo(
- LDAPConnectionFactory connectionFactory) {
- return new ProxyToHandler(connectionFactory);
- }
-
- /**
- * {@link ReactiveHandler} responding with the provided Response for all incoming requests.
- *
- * @param <CTX>
- * Context type in which request are processed
- * @param response
- * The {@link Response} to send as response for all {@link Request}
- * @return a {@link ReactiveHandler} replying {@link Response} for all {@link Request}
- */
- public static <CTX> ReactiveHandler<CTX, Request, Stream<Response>> respondWith(final Response response) {
- return new ReactiveHandler<CTX, Request, Stream<Response>>() {
- @Override
- public Single<Stream<Response>> handle(final CTX context, final Request request) {
- if (request instanceof UnbindRequest) {
- return singleFrom(RxJavaStreams.<Response>emptyStream());
- }
- return singleFrom(streamFrom(response));
- }
- };
- }
-
- /**
- * Decodes incoming {@link LdapRawMessage} into a {@link Request}.
- *
- * @param decodeOptions
- * {@link DecodeOptions} used during {@link Request} decoding
- * @return a {@link ReactiveFilter} decoding {@link LdapRawMessage} into {@link Request}
- */
- public static ReactiveFilter<LDAPClientConnection2, LdapRawMessage, Stream<Response>,
- Request, Stream<Response>>
- decodeRequest(final DecodeOptions decodeOptions) {
- return new ReactiveFilter<LDAPClientConnection2, LdapRawMessage, Stream<Response>,
- Request, Stream<Response>>() {
- @Override
- public Single<Stream<Response>> filter(final LDAPClientConnection2 context,
- final LdapRawMessage encodedRequestMessage,
- final ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> next) throws Exception {
- return newSingle(new Single.Emitter<Request>() {
- @Override
- public void subscribe(final Single.Subscriber<Request> subscriber) throws Exception {
- LDAP.getReader(encodedRequestMessage.getContent(), decodeOptions)
- .readMessage(new AbstractLDAPMessageHandler() {
- @Override
- public void abandonRequest(final int messageID, final AbandonRequest request)
- throws DecodeException, IOException {
- subscriber.onComplete(request);
- }
-
- @Override
- public void addRequest(int messageID, AddRequest request)
- throws DecodeException, IOException {
- subscriber.onComplete(request);
- }
-
- @Override
- public void bindRequest(int messageID, int version, GenericBindRequest request)
- throws DecodeException, IOException {
- subscriber.onComplete(request);
- }
-
- @Override
- public void modifyDNRequest(int messageID, ModifyDNRequest request)
- throws DecodeException, IOException {
- subscriber.onComplete(request);
- }
-
- @Override
- public void modifyRequest(int messageID, ModifyRequest request)
- throws DecodeException, IOException {
- subscriber.onComplete(request);
- }
-
- @Override
- public void searchRequest(int messageID, SearchRequest request)
- throws DecodeException, IOException {
- subscriber.onComplete(request);
- }
-
- @Override
- public void unbindRequest(int messageID, UnbindRequest request)
- throws DecodeException, IOException {
- subscriber.onComplete(request);
- }
- });
- }
- }).flatMap(new Function<Request, Single<Stream<Response>>, Exception>() {
- @Override
- public Single<Stream<Response>> apply(final Request request) throws Exception {
- return next.handle(context, request);
- }
- });
- }
- };
- }
-
- /**
- * Invoke the following {@link ReactiveFilter} from the given {@link Executor}.
- *
- * @param <CTX>
- * Context type in which request are processed
- * @param <REQ>
- * Type of dispatched request
- * @param <REP>
- * Type of dispatched response
- * @param executor
- * The {@link Executor} used to forward the request
- * @return A {@link ReactiveFilter} fowarding {@link Request} through the {@link Executor}
- */
- public static <CTX, REQ, REP> ReactiveFilter<CTX, REQ, REP, REQ, REP> dispatch(final Executor executor) {
- /** Dispatches request into an {@link Executor}. */
- final class DispatchFilter extends SimpleReactiveFilter<CTX, REQ, REP> {
- private final Scheduler executor;
-
- DispatchFilter(final Executor executor) {
- this.executor = Schedulers.from(checkNotNull(executor, "executor must not be null"));
- }
-
- @Override
- public Single<REP> filter(final CTX context, final REQ request,
- final ReactiveHandler<CTX, REQ, REP> next) {
- return singleFromPublisher(Flowable.defer(new Callable<Publisher<REP>>() {
- @Override
- public Publisher<REP> call() throws Exception {
- return next.handle(context, request);
- }
- }).subscribeOn(executor));
- }
- }
- return new DispatchFilter(executor);
- }
-
- private Components() {
- // Prevent instantiation
- }
-}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 0e14bea..96304a7 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -105,1015 +105,848 @@
import io.reactivex.FlowableOnSubscribe;
/**
- * This class defines an LDAP client connection, which is a type of
- * client connection that will be accepted by an instance of the LDAP
- * connection handler and have its requests decoded by an LDAP request
- * handler.
+ * This class defines an LDAP client connection, which is a type of client connection that will be accepted by an
+ * instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
*/
-public final class LDAPClientConnection2 extends ClientConnection implements
- TLSCapableConnection, ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>>
-{
- private static final String REACTIVE_OUT = "reactive.out";
+public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
+ ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
+ private static final String REACTIVE_OUT = "reactive.out";
-/** The tracer object for the debug logger. */
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+ /** The tracer object for the debug logger. */
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
- /** The time that the last operation was completed. */
- private final AtomicLong lastCompletionTime;
- /** The next operation ID that should be used for this connection. */
- private final AtomicLong nextOperationID;
+ /** The time that the last operation was completed. */
+ private final AtomicLong lastCompletionTime;
+ /** The next operation ID that should be used for this connection. */
+ private final AtomicLong nextOperationID;
- /**
- * Indicates whether the Directory Server believes this connection to be valid
- * and available for communication.
- */
- private volatile boolean connectionValid;
+ /**
+ * Indicates whether the Directory Server believes this connection to be valid and available for communication.
+ */
+ private volatile boolean connectionValid;
- /**
- * Indicates whether this connection is about to be closed. This will be used
- * to prevent accepting new requests while a disconnect is in progress.
- */
- private boolean disconnectRequested;
+ /**
+ * Indicates whether this connection is about to be closed. This will be used to prevent accepting new requests
+ * while a disconnect is in progress.
+ */
+ private boolean disconnectRequested;
- /**
- * Indicates whether the connection should keep statistics regarding the
- * operations that it is performing.
- */
- private final boolean keepStats;
+ /**
+ * Indicates whether the connection should keep statistics regarding the operations that it is performing.
+ */
+ private final boolean keepStats;
- /** The set of all operations currently in progress on this connection. */
- private final ConcurrentHashMap<Integer, Operation> operationsInProgress;
+ /** The set of all operations currently in progress on this connection. */
+ private final ConcurrentHashMap<Integer, Operation> operationsInProgress;
- /**
- * The number of operations performed on this connection. Used to compare with
- * the resource limits of the network group.
- */
- private final AtomicLong operationsPerformed;
+ /**
+ * The number of operations performed on this connection. Used to compare with the resource limits of the network
+ * group.
+ */
+ private final AtomicLong operationsPerformed;
- /** The port on the client from which this connection originated. */
- private final int clientPort;
- /** The LDAP version that the client is using to communicate with the server. */
- private int ldapVersion;
- /** The port on the server to which this client has connected. */
- private final int serverPort;
+ /** The port on the client from which this connection originated. */
+ private final int clientPort;
+ /** The LDAP version that the client is using to communicate with the server. */
+ private int ldapVersion;
+ /** The port on the server to which this client has connected. */
+ private final int serverPort;
- /** The reference to the connection handler that accepted this connection. */
- private final LDAPConnectionHandler2 connectionHandler;
- /** The statistics tracker associated with this client connection. */
- private final LDAPStatistics statTracker;
- private final boolean useNanoTime;
+ /** The reference to the connection handler that accepted this connection. */
+ private final LDAPConnectionHandler2 connectionHandler;
+ /** The statistics tracker associated with this client connection. */
+ private final LDAPStatistics statTracker;
+ private final boolean useNanoTime;
- /** The connection ID assigned to this connection. */
- private final long connectionID;
+ /** The connection ID assigned to this connection. */
+ private final long connectionID;
- /** The lock used to provide threadsafe access to the set of operations in progress. */
- private final Object opsInProgressLock;
+ /** The lock used to provide threadsafe access to the set of operations in progress. */
+ private final Object opsInProgressLock;
- /** The socket channel with which this client connection is associated. */
- private final LDAPClientContext clientContext;
+ /** The socket channel with which this client connection is associated. */
+ private final LDAPClientContext clientContext;
- /** The string representation of the address of the client. */
- private final String clientAddress;
- /** The name of the protocol that the client is using to communicate with the server. */
- private final String protocol;
- /** The string representation of the address of the server to which the client has connected. */
- private final String serverAddress;
+ /** The string representation of the address of the client. */
+ private final String clientAddress;
+ /** The name of the protocol that the client is using to communicate with the server. */
+ private final String protocol;
+ /** The string representation of the address of the server to which the client has connected. */
+ private final String serverAddress;
- /**
- * Creates a new LDAP client connection with the provided information.
- *
- * @param connectionHandler
- * The connection handler that accepted this connection.
- * @param clientContext
- * The socket channel that may be used to communicate with
- * the client.
- * @param protocol String representing the protocol (LDAP or LDAP+SSL).
- * @throws LdapException
- * @throws DirectoryException If SSL initialisation fails.
- */
- LDAPClientConnection2(LDAPConnectionHandler2 connectionHandler, LDAPClientContext clientContext, String protocol,
- boolean keepStats)
- {
- this.connectionHandler = connectionHandler;
- this.clientContext = clientContext;
- opsInProgressLock = new Object();
- ldapVersion = 3;
- lastCompletionTime = new AtomicLong(TimeThread.getTime());
- nextOperationID = new AtomicLong(0);
- connectionValid = true;
- disconnectRequested = false;
- operationsInProgress = new ConcurrentHashMap<>();
- operationsPerformed = new AtomicLong(0);
- this.keepStats = keepStats;
- this.protocol = protocol;
+ /**
+ * Creates a new LDAP client connection with the provided information.
+ *
+ * @param connectionHandler
+ * The connection handler that accepted this connection.
+ * @param clientContext
+ * The socket channel that may be used to communicate with the client.
+ * @param protocol
+ * String representing the protocol (LDAP or LDAP+SSL).
+ * @throws LdapException
+ * @throws DirectoryException
+ * If SSL initialisation fails.
+ */
+ LDAPClientConnection2(LDAPConnectionHandler2 connectionHandler, LDAPClientContext clientContext, String protocol,
+ boolean keepStats) {
+ this.connectionHandler = connectionHandler;
+ this.clientContext = clientContext;
+ opsInProgressLock = new Object();
+ ldapVersion = 3;
+ lastCompletionTime = new AtomicLong(TimeThread.getTime());
+ nextOperationID = new AtomicLong(0);
+ connectionValid = true;
+ disconnectRequested = false;
+ operationsInProgress = new ConcurrentHashMap<>();
+ operationsPerformed = new AtomicLong(0);
+ this.keepStats = keepStats;
+ this.protocol = protocol;
- clientAddress = clientContext.getPeerAddress().getAddress().getHostAddress();
- clientPort = clientContext.getPeerAddress().getPort();
- serverAddress = clientContext.getLocalAddress().getAddress().getHostAddress();
- serverPort = clientContext.getLocalAddress().getPort();
+ clientAddress = clientContext.getPeerAddress().getAddress().getHostAddress();
+ clientPort = clientContext.getPeerAddress().getPort();
+ serverAddress = clientContext.getLocalAddress().getAddress().getHostAddress();
+ serverPort = clientContext.getLocalAddress().getPort();
- statTracker = this.connectionHandler.getStatTracker();
- if (keepStats)
- {
- statTracker.updateConnect();
- this.useNanoTime = DirectoryServer.getUseNanoTime();
- }
- else
- {
- this.useNanoTime = false;
- }
-
- connectionID = DirectoryServer.newConnectionAccepted(this);
- clientContext.onDisconnect(new DisconnectListener()
- {
- @Override
- public void exceptionOccurred(LDAPClientContext context, Throwable error)
- {
- if (error instanceof LocalizableException)
- {
- disconnect(DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
- }
- else
- {
- disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
- }
- }
-
- @Override
- public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage)
- {
- disconnect(DisconnectReason.SERVER_ERROR, false, null);
- }
-
- @Override
- public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest)
- {
- disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
- }
- });
- }
-
- /**
- * Retrieves the connection ID assigned to this connection.
- *
- * @return The connection ID assigned to this connection.
- */
- @Override
- public long getConnectionID()
- {
- return connectionID;
- }
-
- /**
- * Retrieves the connection handler that accepted this client
- * connection.
- *
- * @return The connection handler that accepted this client
- * connection.
- */
- @Override
- public ConnectionHandler<?> getConnectionHandler()
- {
- return connectionHandler;
- }
-
- /**
- * Retrieves the socket channel that can be used to communicate with
- * the client.
- *
- * @return The socket channel that can be used to communicate with the
- * client.
- */
- @Override
- public SocketChannel getSocketChannel()
- {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Retrieves the protocol that the client is using to communicate with
- * the Directory Server.
- *
- * @return The protocol that the client is using to communicate with
- * the Directory Server.
- */
- @Override
- public String getProtocol()
- {
- return protocol;
- }
-
- /**
- * Retrieves a string representation of the address of the client.
- *
- * @return A string representation of the address of the client.
- */
- @Override
- public String getClientAddress()
- {
- return clientAddress;
- }
-
- /**
- * Retrieves the port number for this connection on the client system.
- *
- * @return The port number for this connection on the client system.
- */
- @Override
- public int getClientPort()
- {
- return clientPort;
- }
-
- /**
- * Retrieves a string representation of the address on the server to
- * which the client connected.
- *
- * @return A string representation of the address on the server to
- * which the client connected.
- */
- @Override
- public String getServerAddress()
- {
- return serverAddress;
- }
-
- /**
- * Retrieves the port number for this connection on the server system.
- *
- * @return The port number for this connection on the server system.
- */
- @Override
- public int getServerPort()
- {
- return serverPort;
- }
-
- /**
- * Retrieves the <CODE>java.net.InetAddress</CODE> associated with the
- * remote client system.
- *
- * @return The <CODE>java.net.InetAddress</CODE> associated with the
- * remote client system. It may be <CODE>null</CODE> if the
- * client is not connected over an IP-based connection.
- */
- @Override
- public InetAddress getRemoteAddress()
- {
- return clientContext.getPeerAddress().getAddress();
- }
-
- /**
- * Retrieves the <CODE>java.net.InetAddress</CODE> for the Directory
- * Server system to which the client has established the connection.
- *
- * @return The <CODE>java.net.InetAddress</CODE> for the Directory
- * Server system to which the client has established the
- * connection. It may be <CODE>null</CODE> if the client is
- * not connected over an IP-based connection.
- */
- @Override
- public InetAddress getLocalAddress()
- {
- return clientContext.getLocalAddress().getAddress();
- }
-
- @Override
- public boolean isConnectionValid()
- {
- return this.connectionValid;
- }
-
- /**
- * Indicates whether this client connection is currently using a
- * secure mechanism to communicate with the server. Note that this may
- * change over time based on operations performed by the client or
- * server (e.g., it may go from <CODE>false</CODE> to
- * <CODE>true</CODE> if the client uses the StartTLS extended
- * operation).
- *
- * @return <CODE>true</CODE> if the client connection is currently
- * using a secure mechanism to communicate with the server, or
- * <CODE>false</CODE> if not.
- */
- @Override
- public boolean isSecure()
- {
- return false;
- }
-
- /**
- * Sends a response to the client based on the information in the
- * provided operation.
- *
- * @param operation
- * The operation for which to send the response.
- */
- @Override
- public void sendResponse(Operation operation)
- {
- // Since this is the final response for this operation, we can go
- // ahead and remove it from the "operations in progress" list. It
- // can't be canceled after this point, and this will avoid potential
- // race conditions in which the client immediately sends another
- // request with the same message ID as was used for this operation.
-
- if (keepStats) {
- long time;
- if (useNanoTime) {
- time = operation.getProcessingNanoTime();
+ statTracker = this.connectionHandler.getStatTracker();
+ if (keepStats) {
+ statTracker.updateConnect();
+ this.useNanoTime = DirectoryServer.getUseNanoTime();
} else {
- time = operation.getProcessingTime();
+ this.useNanoTime = false;
}
- this.statTracker.updateOperationMonitoringData(
- operation.getOperationType(),
- time);
+
+ connectionID = DirectoryServer.newConnectionAccepted(this);
+ clientContext.onDisconnect(new DisconnectListener() {
+ @Override
+ public void exceptionOccurred(LDAPClientContext context, Throwable error) {
+ if (error instanceof LocalizableException) {
+ disconnect(
+ DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
+ } else {
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
+ }
+ }
+
+ @Override
+ public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
+ String diagnosticMessage) {
+ disconnect(DisconnectReason.SERVER_ERROR, false, null);
+ }
+
+ @Override
+ public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
+ disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
+ }
+ });
}
- // Avoid sending the response if one has already been sent. This may happen
- // if operation processing encounters a run-time exception after sending the
- // response: the worker thread exception handling code will attempt to send
- // an error result to the client indicating that a problem occurred.
- if (removeOperationInProgress(operation.getMessageID()))
- {
- final Response response = operationToResponse(operation);
- final FlowableEmitter<Response> out = getOut(operation);
- if (response != null)
- {
+ /**
+ * Retrieves the connection ID assigned to this connection.
+ *
+ * @return The connection ID assigned to this connection.
+ */
+ @Override
+ public long getConnectionID() {
+ return connectionID;
+ }
+
+ /**
+ * Retrieves the connection handler that accepted this client connection.
+ *
+ * @return The connection handler that accepted this client connection.
+ */
+ @Override
+ public ConnectionHandler<?> getConnectionHandler() {
+ return connectionHandler;
+ }
+
+ /**
+ * Retrieves the socket channel that can be used to communicate with the client.
+ *
+ * @return The socket channel that can be used to communicate with the client.
+ */
+ @Override
+ public SocketChannel getSocketChannel() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Retrieves the protocol that the client is using to communicate with the Directory Server.
+ *
+ * @return The protocol that the client is using to communicate with the Directory Server.
+ */
+ @Override
+ public String getProtocol() {
+ return protocol;
+ }
+
+ /**
+ * Retrieves a string representation of the address of the client.
+ *
+ * @return A string representation of the address of the client.
+ */
+ @Override
+ public String getClientAddress() {
+ return clientAddress;
+ }
+
+ /**
+ * Retrieves the port number for this connection on the client system.
+ *
+ * @return The port number for this connection on the client system.
+ */
+ @Override
+ public int getClientPort() {
+ return clientPort;
+ }
+
+ /**
+ * Retrieves a string representation of the address on the server to which the client connected.
+ *
+ * @return A string representation of the address on the server to which the client connected.
+ */
+ @Override
+ public String getServerAddress() {
+ return serverAddress;
+ }
+
+ /**
+ * Retrieves the port number for this connection on the server system.
+ *
+ * @return The port number for this connection on the server system.
+ */
+ @Override
+ public int getServerPort() {
+ return serverPort;
+ }
+
+ /**
+ * Retrieves the <CODE>java.net.InetAddress</CODE> associated with the remote client system.
+ *
+ * @return The <CODE>java.net.InetAddress</CODE> associated with the remote client system. It may be
+ * <CODE>null</CODE> if the client is not connected over an IP-based connection.
+ */
+ @Override
+ public InetAddress getRemoteAddress() {
+ return clientContext.getPeerAddress().getAddress();
+ }
+
+ /**
+ * Retrieves the <CODE>java.net.InetAddress</CODE> for the Directory Server system to which the client has
+ * established the connection.
+ *
+ * @return The <CODE>java.net.InetAddress</CODE> for the Directory Server system to which the client has established
+ * the connection. It may be <CODE>null</CODE> if the client is not connected over an IP-based connection.
+ */
+ @Override
+ public InetAddress getLocalAddress() {
+ return clientContext.getLocalAddress().getAddress();
+ }
+
+ @Override
+ public boolean isConnectionValid() {
+ return this.connectionValid;
+ }
+
+ /**
+ * Indicates whether this client connection is currently using a secure mechanism to communicate with the server.
+ * Note that this may change over time based on operations performed by the client or server (e.g., it may go from
+ * <CODE>false</CODE> to <CODE>true</CODE> if the client uses the StartTLS extended operation).
+ *
+ * @return <CODE>true</CODE> if the client connection is currently using a secure mechanism to communicate with the
+ * server, or <CODE>false</CODE> if not.
+ */
+ @Override
+ public boolean isSecure() {
+ return false;
+ }
+
+ /**
+ * Sends a response to the client based on the information in the provided operation.
+ *
+ * @param operation
+ * The operation for which to send the response.
+ */
+ @Override
+ public void sendResponse(Operation operation) {
+ // Since this is the final response for this operation, we can go
+ // ahead and remove it from the "operations in progress" list. It
+ // can't be canceled after this point, and this will avoid potential
+ // race conditions in which the client immediately sends another
+ // request with the same message ID as was used for this operation.
+
+ if (keepStats) {
+ long time;
+ if (useNanoTime) {
+ time = operation.getProcessingNanoTime();
+ } else {
+ time = operation.getProcessingTime();
+ }
+ this.statTracker.updateOperationMonitoringData(operation.getOperationType(), time);
+ }
+
+ // Avoid sending the response if one has already been sent. This may happen
+ // if operation processing encounters a run-time exception after sending the
+ // response: the worker thread exception handling code will attempt to send
+ // an error result to the client indicating that a problem occurred.
+ if (removeOperationInProgress(operation.getMessageID())) {
+ final Response response = operationToResponse(operation);
+ final FlowableEmitter<Response> out = getOut(operation);
+ if (response != null) {
+ out.onNext(response);
+ }
+ out.onComplete();
+ }
+ }
+
+ /**
+ * Retrieves an LDAPMessage containing a response generated from the provided operation.
+ *
+ * @param operation
+ * The operation to use to generate the response LDAPMessage.
+ * @return An LDAPMessage containing a response generated from the provided operation.
+ */
+ private Response operationToResponse(Operation operation) {
+ ResultCode resultCode = operation.getResultCode();
+ if (resultCode == null) {
+ // This must mean that the operation has either not yet completed
+ // or that it completed without a result for some reason. In any
+ // case, log a message and set the response to "operations error".
+ logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_NO_RESULT_CODE, operation.getOperationType(),
+ operation.getConnectionID(), operation.getOperationID());
+ resultCode = DirectoryServer.getServerErrorResultCode();
+ }
+
+ LocalizableMessageBuilder errorMessage = operation.getErrorMessage();
+ String matchedDN = operation.getMatchedDN() != null ? operation.getMatchedDN().toString() : null;
+
+ // Referrals are not allowed for LDAPv2 clients.
+ List<String> referralURLs;
+ if (ldapVersion == 2) {
+ referralURLs = null;
+
+ if (resultCode == ResultCode.REFERRAL) {
+ resultCode = ResultCode.CONSTRAINT_VIOLATION;
+ errorMessage.append(ERR_LDAPV2_REFERRAL_RESULT_CHANGED.get());
+ }
+
+ List<String> opReferrals = operation.getReferralURLs();
+ if (opReferrals != null && !opReferrals.isEmpty()) {
+ StringBuilder referralsStr = new StringBuilder();
+ Iterator<String> iterator = opReferrals.iterator();
+ referralsStr.append(iterator.next());
+
+ while (iterator.hasNext()) {
+ referralsStr.append(", ");
+ referralsStr.append(iterator.next());
+ }
+
+ errorMessage.append(ERR_LDAPV2_REFERRALS_OMITTED.get(referralsStr));
+ }
+ } else {
+ referralURLs = operation.getReferralURLs();
+ }
+
+ final Result result;
+ switch (operation.getOperationType()) {
+ case ADD:
+ result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ case BIND:
+ result = Responses.newBindResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN)
+ .setServerSASLCredentials(((BindOperationBasis) operation).getServerSASLCredentials());
+ break;
+ case COMPARE:
+ result = Responses.newCompareResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ case DELETE:
+ result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ case EXTENDED:
+ // If this an LDAPv2 client, then we can't send this.
+ if (ldapVersion == 2) {
+ logger.error(ERR_LDAPV2_SKIPPING_EXTENDED_RESPONSE, getConnectionID(), operation.getOperationID(),
+ operation);
+ return null;
+ }
+
+ ExtendedOperationBasis extOp = (ExtendedOperationBasis) operation;
+ result = Responses.newGenericExtendedResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN).setOID(extOp.getResponseOID()).setValue(extOp.getResponseValue());
+ break;
+ case MODIFY:
+ result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ case MODIFY_DN:
+ result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ case SEARCH:
+ result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ default:
+ // This must be a type of operation that doesn't have a response.
+ // This shouldn't happen, so log a message and return.
+ logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_INVALID_OP, operation.getOperationType(), getConnectionID(),
+ operation.getOperationID(), operation);
+ return null;
+ }
+ if (referralURLs != null) {
+ result.getReferralURIs().addAll(referralURLs);
+ }
+
+ // Controls are not allowed for LDAPv2 clients.
+ if (ldapVersion != 2) {
+ for (Control control : operation.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Sends the provided search result entry to the client.
+ *
+ * @param searchOperation
+ * The search operation with which the entry is associated
+ * @param searchEntry
+ * The search result entry to be sent to the client
+ */
+ @Override
+ public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
+ getEmitter(searchOperation).onNext(toResponse(searchEntry));
+ }
+
+ private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
+ return getOut(searchOperation);
+ }
+
+ private Response toResponse(SearchResultEntry searchEntry) {
+ return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
+ }
+
+ private FlowableEmitter<Response> getOut(Operation operation) {
+ return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
+ }
+
+ /**
+ * Sends the provided search result reference to the client.
+ *
+ * @param searchOperation
+ * The search operation with which the reference is associated.
+ * @param searchReference
+ * The search result reference to be sent to the client.
+ * @return <CODE>true</CODE> if the client is able to accept referrals, or <CODE>false</CODE> if the client cannot
+ * handle referrals and no more attempts should be made to send them for the associated search operation.
+ */
+ @Override
+ public boolean sendSearchReference(SearchOperation searchOperation, SearchResultReference searchReference) {
+ // Make sure this is not an LDAPv2 client. If it is, then they can't
+ // see referrals so we'll not send anything. Also, throw an
+ // exception so that the core server will know not to try sending
+ // any more referrals to this client for the rest of the operation.
+ if (ldapVersion == 2) {
+ logger.error(ERR_LDAPV2_SKIPPING_SEARCH_REFERENCE, getConnectionID(), searchOperation.getOperationID(),
+ searchReference);
+ return false;
+ }
+
+ final FlowableEmitter<Response> out = getOut(searchOperation);
+ out.onNext(Converters.from(searchReference));
+
+ return true;
+ }
+
+ /**
+ * Sends the provided intermediate response message to the client.
+ *
+ * @param intermediateResponse
+ * The intermediate response message to be sent.
+ * @return <CODE>true</CODE> if processing on the associated operation should continue, or <CODE>false</CODE> if
+ * not.
+ */
+ @Override
+ protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
+ final Operation operation = intermediateResponse.getOperation();
+ final FlowableEmitter<Response> out = getOut(operation);
+
+ final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
+ intermediateResponse.getValue());
+ for (Control control : intermediateResponse.getControls()) {
+ response.addControl(Converters.from(control));
+ }
+
out.onNext(response);
- }
- out.onComplete();
- }
- }
- /**
- * Retrieves an LDAPMessage containing a response generated from the
- * provided operation.
- *
- * @param operation
- * The operation to use to generate the response LDAPMessage.
- * @return An LDAPMessage containing a response generated from the
- * provided operation.
- */
- private Response operationToResponse(Operation operation)
- {
- ResultCode resultCode = operation.getResultCode();
- if (resultCode == null)
- {
- // This must mean that the operation has either not yet completed
- // or that it completed without a result for some reason. In any
- // case, log a message and set the response to "operations error".
- logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_NO_RESULT_CODE, operation.getOperationType(),
- operation.getConnectionID(), operation.getOperationID());
- resultCode = DirectoryServer.getServerErrorResultCode();
+ // The only reason we shouldn't continue processing is if the
+ // connection is closed.
+ return connectionValid;
}
- LocalizableMessageBuilder errorMessage = operation.getErrorMessage();
- String matchedDN = operation.getMatchedDN() != null ? operation.getMatchedDN().toString() : null;
-
- // Referrals are not allowed for LDAPv2 clients.
- List<String> referralURLs;
- if (ldapVersion == 2)
- {
- referralURLs = null;
-
- if (resultCode == ResultCode.REFERRAL)
- {
- resultCode = ResultCode.CONSTRAINT_VIOLATION;
- errorMessage.append(ERR_LDAPV2_REFERRAL_RESULT_CHANGED.get());
- }
-
- List<String> opReferrals = operation.getReferralURLs();
- if (opReferrals != null && !opReferrals.isEmpty())
- {
- StringBuilder referralsStr = new StringBuilder();
- Iterator<String> iterator = opReferrals.iterator();
- referralsStr.append(iterator.next());
-
- while (iterator.hasNext())
- {
- referralsStr.append(", ");
- referralsStr.append(iterator.next());
- }
-
- errorMessage.append(ERR_LDAPV2_REFERRALS_OMITTED.get(referralsStr));
- }
- }
- else
- {
- referralURLs = operation.getReferralURLs();
- }
-
- final Result result;
- switch (operation.getOperationType())
- {
- case ADD:
- result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
- break;
- case BIND:
- result = Responses.newBindResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN)
- .setServerSASLCredentials(((BindOperationBasis) operation).getServerSASLCredentials());
- break;
- case COMPARE:
- result = Responses.newCompareResult(resultCode)
- .setDiagnosticMessage(errorMessage.toString())
- .setMatchedDN(matchedDN);
- break;
- case DELETE:
- result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
- break;
- case EXTENDED:
- // If this an LDAPv2 client, then we can't send this.
- if (ldapVersion == 2)
- {
- logger.error(ERR_LDAPV2_SKIPPING_EXTENDED_RESPONSE,
- getConnectionID(), operation.getOperationID(), operation);
- return null;
- }
-
- ExtendedOperationBasis extOp = (ExtendedOperationBasis) operation;
- result = Responses.newGenericExtendedResult(resultCode)
- .setDiagnosticMessage(errorMessage.toString())
- .setMatchedDN(matchedDN)
- .setOID(extOp.getResponseOID()).setValue(extOp.getResponseValue());
- break;
- case MODIFY:
- result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
- break;
- case MODIFY_DN:
- result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
- break;
- case SEARCH:
- result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
- break;
- default:
- // This must be a type of operation that doesn't have a response.
- // This shouldn't happen, so log a message and return.
- logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_INVALID_OP, operation.getOperationType(), getConnectionID(),
- operation.getOperationID(), operation);
- return null;
- }
- if (referralURLs != null)
- {
- result.getReferralURIs().addAll(referralURLs);
- }
-
- // Controls are not allowed for LDAPv2 clients.
- if (ldapVersion != 2)
- {
- for(Control control : operation.getResponseControls()) {
- result.addControl(Converters.from(control));
- }
- }
-
- return result;
- }
-
- /**
- * Sends the provided search result entry to the client.
- *
- * @param searchOperation
- * The search operation with which the entry is associated
- * @param searchEntry
- * The search result entry to be sent to the client
- */
- @Override
- public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry)
- {
- getEmitter(searchOperation).onNext(toResponse(searchEntry));
- }
-
- private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation)
- {
- return getOut(searchOperation);
- }
-
- private Response toResponse(SearchResultEntry searchEntry)
- {
- return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
- }
-
- private FlowableEmitter<Response> getOut(Operation operation)
- {
- return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
- }
-
- /**
- * Sends the provided search result reference to the client.
- *
- * @param searchOperation
- * The search operation with which the reference is
- * associated.
- * @param searchReference
- * The search result reference to be sent to the client.
- * @return <CODE>true</CODE> if the client is able to accept
- * referrals, or <CODE>false</CODE> if the client cannot
- * handle referrals and no more attempts should be made to
- * send them for the associated search operation.
- */
- @Override
- public boolean sendSearchReference(SearchOperation searchOperation, SearchResultReference searchReference)
- {
- // Make sure this is not an LDAPv2 client. If it is, then they can't
- // see referrals so we'll not send anything. Also, throw an
- // exception so that the core server will know not to try sending
- // any more referrals to this client for the rest of the operation.
- if (ldapVersion == 2)
- {
- logger.error(ERR_LDAPV2_SKIPPING_SEARCH_REFERENCE, getConnectionID(),
- searchOperation.getOperationID(), searchReference);
- return false;
- }
-
- final FlowableEmitter<Response> out = getOut(searchOperation);
- out.onNext(Converters.from(searchReference));
-
- return true;
- }
-
- /**
- * Sends the provided intermediate response message to the client.
- *
- * @param intermediateResponse
- * The intermediate response message to be sent.
- * @return <CODE>true</CODE> if processing on the associated operation
- * should continue, or <CODE>false</CODE> if not.
- */
- @Override
- protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse)
- {
- final Operation operation = intermediateResponse.getOperation();
- final FlowableEmitter<Response> out = getOut(operation);
-
- final Response response =
- Responses.newGenericIntermediateResponse(intermediateResponse.getOID(), intermediateResponse.getValue());
- for (Control control : intermediateResponse.getControls())
- {
- response.addControl(Converters.from(control));
- }
-
- out.onNext(response);
-
- // The only reason we shouldn't continue processing is if the
- // connection is closed.
- return connectionValid;
- }
-
- /**
- * Closes the connection to the client, optionally sending it a
- * message indicating the reason for the closure. Note that the
- * ability to send a notice of disconnection may not be available for
- * all protocols or under all circumstances.
- *
- * @param disconnectReason
- * The disconnect reason that provides the generic cause for
- * the disconnect.
- * @param sendNotification
- * Indicates whether to try to provide notification to the
- * client that the connection will be closed.
- * @param message
- * The message to include in the disconnect notification
- * response. It may be <CODE>null</CODE> if no message is to
- * be sent.
- */
- @Override
- public void disconnect(DisconnectReason disconnectReason,
- boolean sendNotification, LocalizableMessage message)
- {
- // Set a flag indicating that the connection is being terminated so
- // that no new requests will be accepted. Also cancel all operations
- // in progress.
- synchronized (opsInProgressLock)
- {
- // If we are already in the middle of a disconnect, then don't
- // do anything.
- if (disconnectRequested)
- {
- return;
- }
-
- disconnectRequested = true;
- }
-
- if (keepStats)
- {
- statTracker.updateDisconnect();
- }
-
- if (connectionID >= 0)
- {
- DirectoryServer.connectionClosed(this);
- }
-
- // Indicate that this connection is no longer valid.
- connectionValid = false;
-
- final LocalizableMessage cancelMessage;
- if (message != null)
- {
- cancelMessage = new LocalizableMessageBuilder()
- .append(disconnectReason.getClosureMessage())
- .append(": ")
- .append(message)
- .toMessage();
- }
- else
- {
- cancelMessage = disconnectReason.getClosureMessage();
- }
- cancelAllOperations(new CancelRequest(true, cancelMessage));
- finalizeConnectionInternal();
-
- // See if we should send a notification to the client. If so, then
- // construct and send a notice of disconnection unsolicited
- // response. Note that we cannot send this notification to an LDAPv2 client.
- if (sendNotification && ldapVersion != 2)
- {
- try
- {
- LocalizableMessage errMsg = message != null ? message : INFO_LDAP_CLIENT_GENERIC_NOTICE_OF_DISCONNECTION.get();
- clientContext.disconnect(ResultCode.valueOf(toResultCode(disconnectReason)), errMsg.toString());
- }
- catch (Exception e)
- {
- // NYI -- Log a message indicating that we couldn't send the
- // notice of disconnection.
- logger.traceException(e);
- }
- }
- else
- {
- clientContext.disconnect();
- }
-
- // NYI -- Deregister the client connection from any server components that
- // might know about it.
-
- logDisconnect(this, disconnectReason, message);
-
- try
- {
- PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
- pluginManager.invokePostDisconnectPlugins(this, disconnectReason, message);
- }
- catch (Exception e)
- {
- logger.traceException(e);
- }
- }
-
- private int toResultCode(DisconnectReason disconnectReason)
- {
- switch (disconnectReason)
- {
- case PROTOCOL_ERROR:
- return LDAPResultCode.PROTOCOL_ERROR;
- case SERVER_SHUTDOWN:
- return LDAPResultCode.UNAVAILABLE;
- case SERVER_ERROR:
- return DirectoryServer.getServerErrorResultCode().intValue();
- case ADMIN_LIMIT_EXCEEDED:
- case IDLE_TIME_LIMIT_EXCEEDED:
- case MAX_REQUEST_SIZE_EXCEEDED:
- case IO_TIMEOUT:
- return LDAPResultCode.ADMIN_LIMIT_EXCEEDED;
- case CONNECTION_REJECTED:
- return LDAPResultCode.CONSTRAINT_VIOLATION;
- case INVALID_CREDENTIALS:
- return LDAPResultCode.INVALID_CREDENTIALS;
- default:
- return LDAPResultCode.OTHER;
- }
- }
-
- /**
- * Retrieves the set of operations in progress for this client
- * connection. This list must not be altered by any caller.
- *
- * @return The set of operations in progress for this client
- * connection.
- */
- @Override
- public Collection<Operation> getOperationsInProgress()
- {
- return operationsInProgress.values();
- }
-
- /**
- * Retrieves the operation in progress with the specified message ID.
- *
- * @param messageID
- * The message ID for the operation to retrieve.
- * @return The operation in progress with the specified message ID, or
- * <CODE>null</CODE> if no such operation could be found.
- */
- @Override
- public Operation getOperationInProgress(int messageID)
- {
- return operationsInProgress.get(messageID);
- }
-
- /**
- * Adds the provided operation to the set of operations in progress
- * for this client connection.
- *
- * @param operation
- * The operation to add to the set of operations in progress
- * for this client connection.
- * @throws DirectoryException
- * If the operation is not added for some reason (e.g., the
- * client already has reached the maximum allowed concurrent
- * requests).
- */
- private void addOperationInProgress(final QueueingStrategy queueingStrategy, Operation operation)
- throws DirectoryException
- {
- int messageID = operation.getMessageID();
-
- // We need to grab a lock to ensure that no one else can add
- // operations to the queue while we are performing some preliminary
- // checks.
- try
- {
- synchronized (opsInProgressLock)
- {
- // If we're already in the process of disconnecting the client,
- // then reject the operation.
- if (disconnectRequested)
- {
- LocalizableMessage message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- message);
- }
-
- // Add the operation to the list of operations in progress for
- // this connection.
- Operation op = operationsInProgress.putIfAbsent(messageID, operation);
-
- // See if there is already an operation in progress with the
- // same message ID. If so, then we can't allow it.
- if (op != null)
- {
- LocalizableMessage message =
- WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
- throw new DirectoryException(ResultCode.PROTOCOL_ERROR,
- message);
- }
- }
-
- // Try to add the operation to the work queue,
- // or run it synchronously (typically for the administration
- // connector)
- queueingStrategy.enqueueRequest(operation);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- operationsInProgress.remove(messageID);
- lastCompletionTime.set(TimeThread.getTime());
-
- throw de;
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- LocalizableMessage message =
- WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
- throw new DirectoryException(DirectoryServer
- .getServerErrorResultCode(), message, e);
- }
- }
-
- /**
- * Removes the provided operation from the set of operations in
- * progress for this client connection. Note that this does not make
- * any attempt to cancel any processing that may already be in
- * progress for the operation.
- *
- * @param messageID
- * The message ID of the operation to remove from the set of
- * operations in progress.
- * @return <CODE>true</CODE> if the operation was found and removed
- * from the set of operations in progress, or
- * <CODE>false</CODE> if not.
- */
- @Override
- public boolean removeOperationInProgress(int messageID)
- {
- Operation operation = operationsInProgress.remove(messageID);
- if (operation == null)
- {
- return false;
- }
-
- if (operation.getOperationType() == OperationType.ABANDON
- && keepStats
- && operation.getResultCode() == ResultCode.CANCELLED)
- {
- statTracker.updateAbandonedOperation();
- }
-
- lastCompletionTime.set(TimeThread.getTime());
- return true;
- }
-
- /**
- * Attempts to cancel the specified operation.
- *
- * @param messageID
- * The message ID of the operation to cancel.
- * @param cancelRequest
- * An object providing additional information about how the
- * cancel should be processed.
- * @return A cancel result that either indicates that the cancel was
- * successful or provides a reason that it was not.
- */
- @Override
- public CancelResult cancelOperation(int messageID,
- CancelRequest cancelRequest)
- {
- Operation op = operationsInProgress.get(messageID);
- if (op != null)
- {
- return op.cancel(cancelRequest);
- }
-
- // See if the operation is in the list of persistent searches.
- for (PersistentSearch ps : getPersistentSearches())
- {
- if (ps.getMessageID() == messageID)
- {
- // We only need to find the first persistent search
- // associated with the provided message ID. The persistent search
- // will ensure that all other related persistent searches are cancelled.
- return ps.cancel();
- }
- }
- return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
- }
-
- /**
- * Attempts to cancel all operations in progress on this connection.
- *
- * @param cancelRequest
- * An object providing additional information about how the
- * cancel should be processed.
- */
- @Override
- public void cancelAllOperations(CancelRequest cancelRequest)
- {
- // Make sure that no one can add any new operations.
- synchronized (opsInProgressLock)
- {
- try
- {
- for (Operation o : operationsInProgress.values())
- {
- try
- {
- o.abort(cancelRequest);
-
- // TODO: Assume its cancelled?
- if (keepStats)
- {
- statTracker.updateAbandonedOperation();
+ /**
+ * Closes the connection to the client, optionally sending it a message indicating the reason for the closure. Note
+ * that the ability to send a notice of disconnection may not be available for all protocols or under all
+ * circumstances.
+ *
+ * @param disconnectReason
+ * The disconnect reason that provides the generic cause for the disconnect.
+ * @param sendNotification
+ * Indicates whether to try to provide notification to the client that the connection will be closed.
+ * @param message
+ * The message to include in the disconnect notification response. It may be <CODE>null</CODE> if no
+ * message is to be sent.
+ */
+ @Override
+ public void disconnect(DisconnectReason disconnectReason, boolean sendNotification, LocalizableMessage message) {
+ // Set a flag indicating that the connection is being terminated so
+ // that no new requests will be accepted. Also cancel all operations
+ // in progress.
+ synchronized (opsInProgressLock) {
+ // If we are already in the middle of a disconnect, then don't
+ // do anything.
+ if (disconnectRequested) {
+ return;
}
- }
- catch (Exception e)
- {
+
+ disconnectRequested = true;
+ }
+
+ if (keepStats) {
+ statTracker.updateDisconnect();
+ }
+
+ if (connectionID >= 0) {
+ DirectoryServer.connectionClosed(this);
+ }
+
+ // Indicate that this connection is no longer valid.
+ connectionValid = false;
+
+ final LocalizableMessage cancelMessage;
+ if (message != null) {
+ cancelMessage = new LocalizableMessageBuilder().append(disconnectReason.getClosureMessage()).append(": ")
+ .append(message).toMessage();
+ } else {
+ cancelMessage = disconnectReason.getClosureMessage();
+ }
+ cancelAllOperations(new CancelRequest(true, cancelMessage));
+ finalizeConnectionInternal();
+
+ // See if we should send a notification to the client. If so, then
+ // construct and send a notice of disconnection unsolicited
+ // response. Note that we cannot send this notification to an LDAPv2 client.
+ if (sendNotification && ldapVersion != 2) {
+ try {
+ LocalizableMessage errMsg = message != null ? message
+ : INFO_LDAP_CLIENT_GENERIC_NOTICE_OF_DISCONNECTION.get();
+ clientContext.disconnect(ResultCode.valueOf(toResultCode(disconnectReason)), errMsg.toString());
+ } catch (Exception e) {
+ // NYI -- Log a message indicating that we couldn't send the
+ // notice of disconnection.
+ logger.traceException(e);
+ }
+ } else {
+ clientContext.disconnect();
+ }
+
+ // NYI -- Deregister the client connection from any server components that
+ // might know about it.
+
+ logDisconnect(this, disconnectReason, message);
+
+ try {
+ PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
+ pluginManager.invokePostDisconnectPlugins(this, disconnectReason, message);
+ } catch (Exception e) {
logger.traceException(e);
- }
}
-
- if (!operationsInProgress.isEmpty()
- || !getPersistentSearches().isEmpty())
- {
- lastCompletionTime.set(TimeThread.getTime());
- }
-
- operationsInProgress.clear();
-
- for (PersistentSearch persistentSearch : getPersistentSearches())
- {
- persistentSearch.cancel();
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
- }
}
- }
- /**
- * Attempts to cancel all operations in progress on this connection
- * except the operation with the specified message ID.
- *
- * @param cancelRequest
- * An object providing additional information about how the
- * cancel should be processed.
- * @param messageID
- * The message ID of the operation that should not be
- * canceled.
- */
- @Override
- public void cancelAllOperationsExcept(CancelRequest cancelRequest,
- int messageID)
- {
- // Make sure that no one can add any new operations.
- synchronized (opsInProgressLock)
- {
- try
- {
- for (int msgID : operationsInProgress.keySet())
- {
- if (msgID == messageID)
- {
- continue;
- }
-
- Operation o = operationsInProgress.get(msgID);
- if (o != null)
- {
- try
- {
- o.abort(cancelRequest);
-
- // TODO: Assume its cancelled?
- if (keepStats)
- {
- statTracker.updateAbandonedOperation();
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
- }
- }
-
- operationsInProgress.remove(msgID);
- lastCompletionTime.set(TimeThread.getTime());
+ private int toResultCode(DisconnectReason disconnectReason) {
+ switch (disconnectReason) {
+ case PROTOCOL_ERROR:
+ return LDAPResultCode.PROTOCOL_ERROR;
+ case SERVER_SHUTDOWN:
+ return LDAPResultCode.UNAVAILABLE;
+ case SERVER_ERROR:
+ return DirectoryServer.getServerErrorResultCode().intValue();
+ case ADMIN_LIMIT_EXCEEDED:
+ case IDLE_TIME_LIMIT_EXCEEDED:
+ case MAX_REQUEST_SIZE_EXCEEDED:
+ case IO_TIMEOUT:
+ return LDAPResultCode.ADMIN_LIMIT_EXCEEDED;
+ case CONNECTION_REJECTED:
+ return LDAPResultCode.CONSTRAINT_VIOLATION;
+ case INVALID_CREDENTIALS:
+ return LDAPResultCode.INVALID_CREDENTIALS;
+ default:
+ return LDAPResultCode.OTHER;
}
-
- for (PersistentSearch persistentSearch : getPersistentSearches())
- {
- if (persistentSearch.getMessageID() == messageID)
- {
- continue;
- }
-
- persistentSearch.cancel();
- lastCompletionTime.set(TimeThread.getTime());
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
- }
}
- }
- @Override
- public Selector getWriteSelector()
- {
- throw new UnsupportedOperationException();
- }
+ /**
+ * Retrieves the set of operations in progress for this client connection. This list must not be altered by any
+ * caller.
+ *
+ * @return The set of operations in progress for this client connection.
+ */
+ @Override
+ public Collection<Operation> getOperationsInProgress() {
+ return operationsInProgress.values();
+ }
- @Override
- public long getMaxBlockedWriteTimeLimit()
- {
- return connectionHandler.getMaxBlockedWriteTimeLimit();
- }
+ /**
+ * Retrieves the operation in progress with the specified message ID.
+ *
+ * @param messageID
+ * The message ID for the operation to retrieve.
+ * @return The operation in progress with the specified message ID, or <CODE>null</CODE> if no such operation could
+ * be found.
+ */
+ @Override
+ public Operation getOperationInProgress(int messageID) {
+ return operationsInProgress.get(messageID);
+ }
- /**
- * Returns the total number of operations initiated on this
- * connection.
- *
- * @return the total number of operations on this connection
- */
- @Override
- public long getNumberOfOperations()
- {
- return operationsPerformed.get();
- }
+ /**
+ * Adds the provided operation to the set of operations in progress for this client connection.
+ *
+ * @param operation
+ * The operation to add to the set of operations in progress for this client connection.
+ * @throws DirectoryException
+ * If the operation is not added for some reason (e.g., the client already has reached the maximum
+ * allowed concurrent requests).
+ */
+ private void addOperationInProgress(final QueueingStrategy queueingStrategy, Operation operation)
+ throws DirectoryException {
+ int messageID = operation.getMessageID();
- /**
- * Processes the provided LDAP message read from the client and takes
- * whatever action is appropriate. For most requests, this will
- * include placing the operation in the work queue. Certain requests
- * (in particular, abandons and unbinds) will be processed directly.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message to process.
- * @return <CODE>true</CODE> if the appropriate action was taken for
- * the request, or <CODE>false</CODE> if there was a fatal
- * error and the client has been disconnected as a result, or
- * if the client unbound from the server.
- */
+ // We need to grab a lock to ensure that no one else can add
+ // operations to the queue while we are performing some preliminary
+ // checks.
+ try {
+ synchronized (opsInProgressLock) {
+ // If we're already in the process of disconnecting the client,
+ // then reject the operation.
+ if (disconnectRequested) {
+ LocalizableMessage message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
+ }
+
+ // Add the operation to the list of operations in progress for
+ // this connection.
+ Operation op = operationsInProgress.putIfAbsent(messageID, operation);
+
+ // See if there is already an operation in progress with the
+ // same message ID. If so, then we can't allow it.
+ if (op != null) {
+ LocalizableMessage message = WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
+ throw new DirectoryException(ResultCode.PROTOCOL_ERROR, message);
+ }
+ }
+
+ // Try to add the operation to the work queue,
+ // or run it synchronously (typically for the administration
+ // connector)
+ queueingStrategy.enqueueRequest(operation);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ operationsInProgress.remove(messageID);
+ lastCompletionTime.set(TimeThread.getTime());
+
+ throw de;
+ } catch (Exception e) {
+ logger.traceException(e);
+
+ LocalizableMessage message = WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
+ throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message, e);
+ }
+ }
+
+ /**
+ * Removes the provided operation from the set of operations in progress for this client connection. Note that this
+ * does not make any attempt to cancel any processing that may already be in progress for the operation.
+ *
+ * @param messageID
+ * The message ID of the operation to remove from the set of operations in progress.
+ * @return <CODE>true</CODE> if the operation was found and removed from the set of operations in progress, or
+ * <CODE>false</CODE> if not.
+ */
+ @Override
+ public boolean removeOperationInProgress(int messageID) {
+ Operation operation = operationsInProgress.remove(messageID);
+ if (operation == null) {
+ return false;
+ }
+
+ if (operation.getOperationType() == OperationType.ABANDON && keepStats
+ && operation.getResultCode() == ResultCode.CANCELLED) {
+ statTracker.updateAbandonedOperation();
+ }
+
+ lastCompletionTime.set(TimeThread.getTime());
+ return true;
+ }
+
+ /**
+ * Attempts to cancel the specified operation.
+ *
+ * @param messageID
+ * The message ID of the operation to cancel.
+ * @param cancelRequest
+ * An object providing additional information about how the cancel should be processed.
+ * @return A cancel result that either indicates that the cancel was successful or provides a reason that it was
+ * not.
+ */
+ @Override
+ public CancelResult cancelOperation(int messageID, CancelRequest cancelRequest) {
+ Operation op = operationsInProgress.get(messageID);
+ if (op != null) {
+ return op.cancel(cancelRequest);
+ }
+
+ // See if the operation is in the list of persistent searches.
+ for (PersistentSearch ps : getPersistentSearches()) {
+ if (ps.getMessageID() == messageID) {
+ // We only need to find the first persistent search
+ // associated with the provided message ID. The persistent search
+ // will ensure that all other related persistent searches are cancelled.
+ return ps.cancel();
+ }
+ }
+ return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
+ }
+
+ /**
+ * Attempts to cancel all operations in progress on this connection.
+ *
+ * @param cancelRequest
+ * An object providing additional information about how the cancel should be processed.
+ */
+ @Override
+ public void cancelAllOperations(CancelRequest cancelRequest) {
+ // Make sure that no one can add any new operations.
+ synchronized (opsInProgressLock) {
+ try {
+ for (Operation o : operationsInProgress.values()) {
+ try {
+ o.abort(cancelRequest);
+
+ // TODO: Assume its cancelled?
+ if (keepStats) {
+ statTracker.updateAbandonedOperation();
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+ }
+ }
+
+ if (!operationsInProgress.isEmpty() || !getPersistentSearches().isEmpty()) {
+ lastCompletionTime.set(TimeThread.getTime());
+ }
+
+ operationsInProgress.clear();
+
+ for (PersistentSearch persistentSearch : getPersistentSearches()) {
+ persistentSearch.cancel();
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+ }
+ }
+ }
+
+ /**
+ * Attempts to cancel all operations in progress on this connection except the operation with the specified message
+ * ID.
+ *
+ * @param cancelRequest
+ * An object providing additional information about how the cancel should be processed.
+ * @param messageID
+ * The message ID of the operation that should not be canceled.
+ */
+ @Override
+ public void cancelAllOperationsExcept(CancelRequest cancelRequest, int messageID) {
+ // Make sure that no one can add any new operations.
+ synchronized (opsInProgressLock) {
+ try {
+ for (int msgID : operationsInProgress.keySet()) {
+ if (msgID == messageID) {
+ continue;
+ }
+
+ Operation o = operationsInProgress.get(msgID);
+ if (o != null) {
+ try {
+ o.abort(cancelRequest);
+
+ // TODO: Assume its cancelled?
+ if (keepStats) {
+ statTracker.updateAbandonedOperation();
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+ }
+ }
+
+ operationsInProgress.remove(msgID);
+ lastCompletionTime.set(TimeThread.getTime());
+ }
+
+ for (PersistentSearch persistentSearch : getPersistentSearches()) {
+ if (persistentSearch.getMessageID() == messageID) {
+ continue;
+ }
+
+ persistentSearch.cancel();
+ lastCompletionTime.set(TimeThread.getTime());
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+ }
+ }
+ }
+
+ @Override
+ public Selector getWriteSelector() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMaxBlockedWriteTimeLimit() {
+ return connectionHandler.getMaxBlockedWriteTimeLimit();
+ }
+
+ /**
+ * Returns the total number of operations initiated on this connection.
+ *
+ * @return the total number of operations on this connection
+ */
+ @Override
+ public long getNumberOfOperations() {
+ return operationsPerformed.get();
+ }
+
+ /**
+ * Processes the provided LDAP message read from the client and takes whatever action is appropriate. For most
+ * requests, this will include placing the operation in the work queue. Certain requests (in particular, abandons
+ * and unbinds) will be processed directly.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message to process.
+ * @return <CODE>true</CODE> if the appropriate action was taken for the request, or <CODE>false</CODE> if there was
+ * a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
+ */
@Override
public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
@@ -1124,855 +957,693 @@
}, BackpressureMode.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
}
- private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final FlowableEmitter<Response> out)
- {
- if (keepStats)
- {
- statTracker.updateMessageRead(message);
+ private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final FlowableEmitter<Response> out) {
+ if (keepStats) {
+ statTracker.updateMessageRead(message);
+ }
+ operationsPerformed.getAndIncrement();
+
+ List<Control> opControls = message.getControls();
+
+ // FIXME -- See if there is a bind in progress. If so, then deny
+ // most kinds of operations.
+
+ // Figure out what type of operation we're dealing with based on the
+ // LDAP message. Abandon and unbind requests will be processed here.
+ // All other types of requests will be encapsulated into operations
+ // and append into the work queue to be picked up by a worker
+ // thread. Any other kinds of LDAP messages (e.g., response
+ // messages) are illegal and will result in the connection being
+ // terminated.
+ try {
+ if (bindInProgress.get()) {
+ throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_BIND_IN_PROGRESS.get());
+ } else if (startTLSInProgress.get()) {
+ throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_STARTTLS_IN_PROGRESS.get());
+ } else if (saslBindInProgress.get() && message.getProtocolOpType() != OP_TYPE_BIND_REQUEST) {
+ throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_SASLBIND_IN_PROGRESS.get());
+ }
+
+ boolean result;
+ switch (message.getProtocolOpType()) {
+ case OP_TYPE_ABANDON_REQUEST:
+ return processAbandonRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_ADD_REQUEST:
+ return processAddRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_BIND_REQUEST:
+ boolean isSaslBind =
+ message.getBindRequestProtocolOp().getAuthenticationType() == AuthenticationType.SASL;
+ bindInProgress.set(true);
+ if (isSaslBind) {
+ saslBindInProgress.set(true);
+ }
+ result = processBindRequest(queueingStrategy, message, opControls, out);
+ if (!result) {
+ bindInProgress.set(false);
+ if (isSaslBind) {
+ saslBindInProgress.set(false);
+ }
+ }
+ return result;
+ case OP_TYPE_COMPARE_REQUEST:
+ return processCompareRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_DELETE_REQUEST:
+ return processDeleteRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_EXTENDED_REQUEST:
+ boolean isStartTlsRequest = OID_START_TLS_REQUEST.equals(message.getExtendedRequestProtocolOp()
+ .getOID());
+ if (isStartTlsRequest) {
+ startTLSInProgress.set(true);
+ }
+ result = processExtendedRequest(queueingStrategy, message, opControls, out);
+ if (!result && isStartTlsRequest) {
+ startTLSInProgress.set(false);
+ }
+ return result;
+ case OP_TYPE_MODIFY_REQUEST:
+ return processModifyRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_MODIFY_DN_REQUEST:
+ return processModifyDNRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_SEARCH_REQUEST:
+ return processSearchRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_UNBIND_REQUEST:
+ return processUnbindRequest(message, opControls);
+ default:
+ LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_INVALID_REQUEST_TYPE.get(
+ message.getProtocolOpName(), message.getMessageID());
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+ return false;
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+
+ LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_PROCESSING_FAILURE.get(message.getProtocolOpName(),
+ message.getMessageID(), e);
+ disconnect(DisconnectReason.SERVER_ERROR, true, msg);
+ return false;
+ }
}
- operationsPerformed.getAndIncrement();
- List<Control> opControls = message.getControls();
+ /**
+ * Processes the provided LDAP message as an abandon request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the abandon request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processAbandonRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ disconnectControlsNotAllowed();
+ return false;
+ }
- // FIXME -- See if there is a bind in progress. If so, then deny
- // most kinds of operations.
+ // Create the abandon operation and add it into the work queue.
+ AbandonRequestProtocolOp protocolOp = message.getAbandonRequestProtocolOp();
+ AbandonOperationBasis abandonOp = new AbandonOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getIDToAbandon());
+ abandonOp.setAttachment(REACTIVE_OUT, out);
- // Figure out what type of operation we're dealing with based on the
- // LDAP message. Abandon and unbind requests will be processed here.
- // All other types of requests will be encapsulated into operations
- // and append into the work queue to be picked up by a worker
- // thread. Any other kinds of LDAP messages (e.g., response
- // messages) are illegal and will result in the connection being
- // terminated.
- try
- {
- if (bindInProgress.get())
- {
- throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_BIND_IN_PROGRESS.get());
- }
- else if (startTLSInProgress.get())
- {
- throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_STARTTLS_IN_PROGRESS.get());
- }
- else if (saslBindInProgress.get() && message.getProtocolOpType() != OP_TYPE_BIND_REQUEST)
- {
- throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_SASLBIND_IN_PROGRESS.get());
- }
+ try {
+ addOperationInProgress(queueingStrategy, abandonOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
- boolean result;
- switch (message.getProtocolOpType())
- {
- case OP_TYPE_ABANDON_REQUEST:
- return processAbandonRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_ADD_REQUEST:
- return processAddRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_BIND_REQUEST:
- boolean isSaslBind = message.getBindRequestProtocolOp().getAuthenticationType() == AuthenticationType.SASL;
- bindInProgress.set(true);
- if (isSaslBind)
- {
- saslBindInProgress.set(true);
+ // Don't send an error response since abandon operations
+ // don't have a response.
}
- result = processBindRequest(queueingStrategy, message, opControls, out);
- if(!result)
- {
- bindInProgress.set(false);
- if (isSaslBind)
- {
- saslBindInProgress.set(false);
- }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as an add request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the add request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processAddRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
}
- return result;
- case OP_TYPE_COMPARE_REQUEST:
- return processCompareRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_DELETE_REQUEST:
- return processDeleteRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_EXTENDED_REQUEST:
- boolean isStartTlsRequest = OID_START_TLS_REQUEST.equals(message.getExtendedRequestProtocolOp().getOID());
- if (isStartTlsRequest)
- {
- startTLSInProgress.set(true);
+
+ // Create the add operation and add it into the work queue.
+ AddRequestProtocolOp protocolOp = message.getAddRequestProtocolOp();
+ AddOperationBasis addOp = new AddOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getAttributes());
+ addOp.setAttachment(REACTIVE_OUT, out);
+
+ try {
+ addOperationInProgress(queueingStrategy, addOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final Result result = Responses.newResult(de.getResultCode())
+ .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+ for (String referral : de.getReferralURLs()) {
+ result.addReferralURI(referral);
+ }
+
+ out.onNext(result);
+ out.onComplete();
}
- result = processExtendedRequest(queueingStrategy, message, opControls, out);
- if (!result && isStartTlsRequest)
- {
- startTLSInProgress.set(false);
+
+ return connectionValid;
+ }
+
+ private void disconnectControlsNotAllowed() {
+ disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
+ }
+
+ /**
+ * Processes the provided LDAP message as a bind request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the bind request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processBindRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ BindRequestProtocolOp protocolOp = message.getBindRequestProtocolOp();
+
+ // See if this is an LDAPv2 bind request, and if so whether that
+ // should be allowed.
+ String versionString;
+ switch (ldapVersion = protocolOp.getProtocolVersion()) {
+ case 2:
+ versionString = "2";
+
+ if (!connectionHandler.allowLDAPv2()) {
+ out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
+ return false;
+ }
+
+ if (!controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ break;
+ case 3:
+ versionString = "3";
+ break;
+ default:
+ // Unsupported protocol version. RFC4511 states that we MUST send
+ // a protocol error back to the client.
+ out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion).toString()));
+ out.onComplete();
+ disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion));
+ return false;
}
- return result;
- case OP_TYPE_MODIFY_REQUEST:
- return processModifyRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_MODIFY_DN_REQUEST:
- return processModifyDNRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_SEARCH_REQUEST:
- return processSearchRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_UNBIND_REQUEST:
- return processUnbindRequest(message, opControls);
- default:
- LocalizableMessage msg =
- ERR_LDAP_DISCONNECT_DUE_TO_INVALID_REQUEST_TYPE.get(message
- .getProtocolOpName(), message.getMessageID());
- disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+
+ ByteString bindDN = protocolOp.getDN();
+
+ BindOperationBasis bindOp;
+ switch (protocolOp.getAuthenticationType()) {
+ case SIMPLE:
+ bindOp = new BindOperationBasis(this, nextOperationID.getAndIncrement(), message.getMessageID(), controls,
+ versionString, bindDN, protocolOp.getSimplePassword());
+ break;
+ case SASL:
+ bindOp = new BindOperationBasis(this, nextOperationID.getAndIncrement(), message.getMessageID(), controls,
+ versionString, bindDN, protocolOp.getSASLMechanism(), protocolOp.getSASLCredentials());
+ break;
+ default:
+ // This is an invalid authentication type, and therefore a
+ // protocol error. As per RFC 2251, a protocol error in a bind
+ // request must result in terminating the connection.
+ LocalizableMessage msg = ERR_LDAP_INVALID_BIND_AUTH_TYPE.get(message.getMessageID(),
+ protocolOp.getAuthenticationType());
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+ return false;
+ }
+
+ // Add the operation into the work queue.
+ bindOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, bindOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final Result result = Responses.newBindResult(de.getResultCode())
+ .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+ for (String referral : de.getReferralURLs()) {
+ result.addReferralURI(referral);
+ }
+
+ out.onNext(result);
+ out.onComplete();
+
+ // If it was a protocol error, then terminate the connection.
+ if (de.getResultCode() == ResultCode.PROTOCOL_ERROR) {
+ LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_BIND_PROTOCOL_ERROR.get(message.getMessageID(),
+ de.getMessageObject());
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+ }
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as a compare request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the compare request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processCompareRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ CompareRequestProtocolOp protocolOp = message.getCompareRequestProtocolOp();
+ CompareOperationBasis compareOp = new CompareOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getAttributeType(),
+ protocolOp.getAssertionValue());
+
+ // Add the operation into the work queue.
+ compareOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, compareOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final CompareResult result = Responses.newCompareResult(de.getResultCode())
+ .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as a delete request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the delete request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processDeleteRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ DeleteRequestProtocolOp protocolOp = message.getDeleteRequestProtocolOp();
+ DeleteOperationBasis deleteOp = new DeleteOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getDN());
+
+ // Add the operation into the work queue.
+ deleteOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, deleteOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final Result result = Responses.newResult(de.getResultCode())
+ .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+ result.getReferralURIs().addAll(de.getReferralURLs());
+
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as an extended request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the extended request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processExtendedRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ // See if this is an LDAPv2 client. If it is, then they should not
+ // be issuing extended requests. We can't send a response that we
+ // can be sure they can understand, so we have no choice but to
+ // close the connection.
+ if (ldapVersion == 2) {
+ // LDAPv2 clients aren't allowed to send controls.
+ LocalizableMessage msg = ERR_LDAPV2_EXTENDED_REQUEST_NOT_ALLOWED.get(getConnectionID(),
+ message.getMessageID());
+
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(msg.toString()));
+ out.onComplete();
+
+ logger.error(msg);
+ disconnect(DisconnectReason.PROTOCOL_ERROR, false, msg);
+ return false;
+ }
+
+ // FIXME -- Do we need to handle certain types of request here?
+ // -- StartTLS requests
+ // -- Cancel requests
+
+ ExtendedRequestProtocolOp protocolOp = message.getExtendedRequestProtocolOp();
+ ExtendedOperationBasis extendedOp = new ExtendedOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getOID(), protocolOp.getValue());
+
+ // Add the operation into the work queue.
+ extendedOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, extendedOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+ final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
+ .setMatchedDN(de.getMatchedDN().toString());
+ result.getReferralURIs().addAll(de.getReferralURLs());
+
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as a modify request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the modify request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processModifyRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ ModifyRequestProtocolOp protocolOp = message.getModifyRequestProtocolOp();
+ ModifyOperationBasis modifyOp = new ModifyOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getModifications());
+
+ // Add the operation into the work queue.
+ modifyOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, modifyOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+ final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
+ .setMatchedDN(de.getMatchedDN().toString());
+ result.getReferralURIs().addAll(de.getReferralURLs());
+
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as a modify DN request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the modify DN request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processModifyDNRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ ModifyDNRequestProtocolOp protocolOp = message.getModifyDNRequestProtocolOp();
+ ModifyDNOperationBasis modifyDNOp = new ModifyDNOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getEntryDN(), protocolOp.getNewRDN(),
+ protocolOp.deleteOldRDN(), protocolOp.getNewSuperior());
+
+ // Add the operation into the work queue.
+ modifyDNOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, modifyDNOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
+ .setMatchedDN(de.getMatchedDN().toString());
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ for (Control control : modifyDNOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as a search request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the search request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processSearchRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ SearchRequestProtocolOp protocolOp = message.getSearchRequestProtocolOp();
+ SearchOperationBasis searchOp = new SearchOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getBaseDN(), protocolOp.getScope(),
+ protocolOp.getDereferencePolicy(), protocolOp.getSizeLimit(), protocolOp.getTimeLimit(),
+ protocolOp.getTypesOnly(), protocolOp.getFilter(), protocolOp.getAttributes());
+
+ // Add the operation into the work queue.
+ searchOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, searchOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final Result result = Responses.newResult(de.getResultCode());
+ if (de.getMessage() != null) {
+ result.setDiagnosticMessage(de.getMessage());
+ }
+ if (de.getMatchedDN() != null) {
+ result.setMatchedDN(de.getMatchedDN().toString());
+ }
+ if (de.getReferralURLs() != null) {
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ }
+ if (searchOp.getResponseControls() != null) {
+ for (Control control : searchOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ }
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as an unbind request.
+ *
+ * @param message
+ * The LDAP message containing the unbind request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processUnbindRequest(final LDAPMessage message, final List<Control> controls) {
+ UnbindOperationBasis unbindOp = new UnbindOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls);
+
+ unbindOp.run();
+
+ // The client connection will never be valid after an unbind.
return false;
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- LocalizableMessage msg =
- ERR_LDAP_DISCONNECT_DUE_TO_PROCESSING_FAILURE.get(message
- .getProtocolOpName(), message.getMessageID(), e);
- disconnect(DisconnectReason.SERVER_ERROR, true, msg);
- return false;
- }
- }
-
- /**
- * Processes the provided LDAP message as an abandon request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the abandon request to
- * process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processAbandonRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- disconnectControlsNotAllowed();
- return false;
}
- // Create the abandon operation and add it into the work queue.
- AbandonRequestProtocolOp protocolOp =
- message.getAbandonRequestProtocolOp();
- AbandonOperationBasis abandonOp =
- new AbandonOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getIDToAbandon());
- abandonOp.setAttachment(REACTIVE_OUT, out);
+ @Override
+ public String getMonitorSummary() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("connID=\"");
+ buffer.append(connectionID);
+ buffer.append("\" connectTime=\"");
+ buffer.append(getConnectTimeString());
+ buffer.append("\" source=\"");
+ buffer.append(clientAddress);
+ buffer.append(":");
+ buffer.append(clientPort);
+ buffer.append("\" destination=\"");
+ buffer.append(serverAddress);
+ buffer.append(":");
+ buffer.append(connectionHandler.getListeners().iterator().next().getPort());
+ buffer.append("\" ldapVersion=\"");
+ buffer.append(ldapVersion);
+ buffer.append("\" authDN=\"");
- try
- {
- addOperationInProgress(queueingStrategy, abandonOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- // Don't send an error response since abandon operations
- // don't have a response.
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as an add request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the add request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processAddRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- // Create the add operation and add it into the work queue.
- AddRequestProtocolOp protocolOp = message.getAddRequestProtocolOp();
- AddOperationBasis addOp =
- new AddOperationBasis(this, nextOperationID.getAndIncrement(),
- message.getMessageID(), controls, protocolOp.getDN(),
- protocolOp.getAttributes());
- addOp.setAttachment(REACTIVE_OUT, out);
-
- try
- {
- addOperationInProgress(queueingStrategy, addOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final Result result = Responses.newResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- for(String referral : de.getReferralURLs()) {
- result.addReferralURI(referral);
- }
-
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- private void disconnectControlsNotAllowed()
- {
- disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
- }
-
- /**
- * Processes the provided LDAP message as a bind request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the bind request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processBindRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- BindRequestProtocolOp protocolOp =
- message.getBindRequestProtocolOp();
-
- // See if this is an LDAPv2 bind request, and if so whether that
- // should be allowed.
- String versionString;
- switch (ldapVersion = protocolOp.getProtocolVersion())
- {
- case 2:
- versionString = "2";
-
- if (!connectionHandler.allowLDAPv2())
- {
- out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
- return false;
- }
-
- if (!controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- break;
- case 3:
- versionString = "3";
- break;
- default:
- // Unsupported protocol version. RFC4511 states that we MUST send
- // a protocol error back to the client.
- out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion).toString()));
- out.onComplete();
- disconnect(DisconnectReason.PROTOCOL_ERROR, false,
- ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion));
- return false;
- }
-
- ByteString bindDN = protocolOp.getDN();
-
- BindOperationBasis bindOp;
- switch (protocolOp.getAuthenticationType())
- {
- case SIMPLE:
- bindOp =
- new BindOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- versionString, bindDN, protocolOp.getSimplePassword());
- break;
- case SASL:
- bindOp =
- new BindOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- versionString, bindDN, protocolOp.getSASLMechanism(),
- protocolOp.getSASLCredentials());
- break;
- default:
- // This is an invalid authentication type, and therefore a
- // protocol error. As per RFC 2251, a protocol error in a bind
- // request must result in terminating the connection.
- LocalizableMessage msg =
- ERR_LDAP_INVALID_BIND_AUTH_TYPE.get(message.getMessageID(),
- protocolOp.getAuthenticationType());
- disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
- return false;
- }
-
- // Add the operation into the work queue.
- bindOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, bindOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final Result result = Responses.newBindResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- for(String referral : de.getReferralURLs())
- {
- result.addReferralURI(referral);
- }
-
- out.onNext(result);
- out.onComplete();
-
- // If it was a protocol error, then terminate the connection.
- if (de.getResultCode() == ResultCode.PROTOCOL_ERROR)
- {
- LocalizableMessage msg =
- ERR_LDAP_DISCONNECT_DUE_TO_BIND_PROTOCOL_ERROR.get(message
- .getMessageID(), de.getMessageObject());
- disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
- }
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as a compare request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the compare request to
- * process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processCompareRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- CompareRequestProtocolOp protocolOp =
- message.getCompareRequestProtocolOp();
- CompareOperationBasis compareOp =
- new CompareOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getDN(), protocolOp.getAttributeType(),
- protocolOp.getAssertionValue());
-
- // Add the operation into the work queue.
- compareOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, compareOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final CompareResult result = Responses.newCompareResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as a delete request.
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the delete request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processDeleteRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage( ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- DeleteRequestProtocolOp protocolOp =
- message.getDeleteRequestProtocolOp();
- DeleteOperationBasis deleteOp =
- new DeleteOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getDN());
-
- // Add the operation into the work queue.
- deleteOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, deleteOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final Result result = Responses.newResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
-
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as an extended request.
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the extended request to
- * process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processExtendedRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- // See if this is an LDAPv2 client. If it is, then they should not
- // be issuing extended requests. We can't send a response that we
- // can be sure they can understand, so we have no choice but to
- // close the connection.
- if (ldapVersion == 2)
- {
- // LDAPv2 clients aren't allowed to send controls.
- LocalizableMessage msg = ERR_LDAPV2_EXTENDED_REQUEST_NOT_ALLOWED.get(getConnectionID(), message.getMessageID());
-
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(msg.toString()));
- out.onComplete();
-
- logger.error(msg);
- disconnect(DisconnectReason.PROTOCOL_ERROR, false, msg);
- return false;
- }
-
- // FIXME -- Do we need to handle certain types of request here?
- // -- StartTLS requests
- // -- Cancel requests
-
- ExtendedRequestProtocolOp protocolOp =
- message.getExtendedRequestProtocolOp();
- ExtendedOperationBasis extendedOp =
- new ExtendedOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getOID(), protocolOp.getValue());
-
- // Add the operation into the work queue.
- extendedOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, extendedOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
- final Result result = Responses.newResult(de.getResultCode())
- .setDiagnosticMessage(de.getMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
-
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as a modify request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the modify request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processModifyRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- ModifyRequestProtocolOp protocolOp =
- message.getModifyRequestProtocolOp();
- ModifyOperationBasis modifyOp =
- new ModifyOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getDN(), protocolOp.getModifications());
-
- // Add the operation into the work queue.
- modifyOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, modifyOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
- final Result result =
- Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()).setMatchedDN(de.getMatchedDN()
- .toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
-
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as a modify DN request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the modify DN request to
- * process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processModifyDNRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- ModifyDNRequestProtocolOp protocolOp =
- message.getModifyDNRequestProtocolOp();
- ModifyDNOperationBasis modifyDNOp =
- new ModifyDNOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getEntryDN(), protocolOp.getNewRDN(), protocolOp
- .deleteOldRDN(), protocolOp.getNewSuperior());
-
- // Add the operation into the work queue.
- modifyDNOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, modifyDNOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final Result result =
- Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()).setMatchedDN(de.getMatchedDN()
- .toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
- for(Control control : modifyDNOp.getResponseControls()) {
- result.addControl(Converters.from(control));
- }
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as a search request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the search request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processSearchRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- SearchRequestProtocolOp protocolOp =
- message.getSearchRequestProtocolOp();
- SearchOperationBasis searchOp =
- new SearchOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getBaseDN(), protocolOp.getScope(), protocolOp
- .getDereferencePolicy(), protocolOp.getSizeLimit(),
- protocolOp.getTimeLimit(), protocolOp.getTypesOnly(),
- protocolOp.getFilter(), protocolOp.getAttributes());
-
- // Add the operation into the work queue.
- searchOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, searchOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final Result result = Responses.newResult(de.getResultCode());
- if (de.getMessage() != null)
- {
- result.setDiagnosticMessage(de.getMessage());
- }
- if (de.getMatchedDN() != null)
- {
- result.setMatchedDN(de.getMatchedDN().toString());
- }
- if (de.getReferralURLs() != null)
- {
- result.getReferralURIs().addAll(de.getReferralURLs());
- }
- if (searchOp.getResponseControls() != null)
- {
- for (Control control : searchOp.getResponseControls())
- {
- result.addControl(Converters.from(control));
+ DN authDN = getAuthenticationInfo().getAuthenticationDN();
+ if (authDN != null) {
+ buffer.append(authDN);
}
- }
- out.onNext(result);
- out.onComplete();
+
+ buffer.append("\" security=\"");
+ buffer.append("none");
+
+ buffer.append("\" opsInProgress=\"");
+ buffer.append(operationsInProgress.size());
+ buffer.append("\"");
+
+ int countPSearch = getPersistentSearches().size();
+ if (countPSearch > 0) {
+ buffer.append(" persistentSearches=\"");
+ buffer.append(countPSearch);
+ buffer.append("\"");
+ }
+ return buffer.toString();
}
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as an unbind request.
- *
- * @param message
- * The LDAP message containing the unbind request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processUnbindRequest(final LDAPMessage message, final List<Control> controls)
- {
- UnbindOperationBasis unbindOp =
- new UnbindOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls);
-
- unbindOp.run();
-
- // The client connection will never be valid after an unbind.
- return false;
- }
-
- @Override
- public String getMonitorSummary()
- {
- StringBuilder buffer = new StringBuilder();
- buffer.append("connID=\"");
- buffer.append(connectionID);
- buffer.append("\" connectTime=\"");
- buffer.append(getConnectTimeString());
- buffer.append("\" source=\"");
- buffer.append(clientAddress);
- buffer.append(":");
- buffer.append(clientPort);
- buffer.append("\" destination=\"");
- buffer.append(serverAddress);
- buffer.append(":");
- buffer.append(connectionHandler.getListeners().iterator().next().getPort());
- buffer.append("\" ldapVersion=\"");
- buffer.append(ldapVersion);
- buffer.append("\" authDN=\"");
-
- DN authDN = getAuthenticationInfo().getAuthenticationDN();
- if (authDN != null)
- {
- buffer.append(authDN);
+ /**
+ * Appends a string representation of this client connection to the provided buffer.
+ *
+ * @param buffer
+ * The buffer to which the information should be appended.
+ */
+ @Override
+ public void toString(StringBuilder buffer) {
+ buffer.append("LDAP client connection from ");
+ buffer.append(clientAddress);
+ buffer.append(":");
+ buffer.append(clientPort);
+ buffer.append(" to ");
+ buffer.append(serverAddress);
+ buffer.append(":");
+ buffer.append(serverPort);
}
- buffer.append("\" security=\"");
- buffer.append("none");
-
- buffer.append("\" opsInProgress=\"");
- buffer.append(operationsInProgress.size());
- buffer.append("\"");
-
- int countPSearch = getPersistentSearches().size();
- if (countPSearch > 0)
- {
- buffer.append(" persistentSearches=\"");
- buffer.append(countPSearch);
- buffer.append("\"");
+ @Override
+ public boolean prepareTLS(LocalizableMessageBuilder unavailableReason) {
+ throw new UnsupportedOperationException();
}
- return buffer.toString();
- }
- /**
- * Appends a string representation of this client connection to the
- * provided buffer.
- *
- * @param buffer
- * The buffer to which the information should be appended.
- */
- @Override
- public void toString(StringBuilder buffer)
- {
- buffer.append("LDAP client connection from ");
- buffer.append(clientAddress);
- buffer.append(":");
- buffer.append(clientPort);
- buffer.append(" to ");
- buffer.append(serverAddress);
- buffer.append(":");
- buffer.append(serverPort);
- }
-
- @Override
- public boolean prepareTLS(LocalizableMessageBuilder unavailableReason)
- {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Retrieves the length of time in milliseconds that this client
- * connection has been idle. <BR>
- * <BR>
- * Note that the default implementation will always return zero.
- * Subclasses associated with connection handlers should override this
- * method if they wish to provided idle time limit functionality.
- *
- * @return The length of time in milliseconds that this client
- * connection has been idle.
- */
- @Override
- public long getIdleTime()
- {
- if (operationsInProgress.isEmpty()
- && getPersistentSearches().isEmpty())
- {
- return TimeThread.getTime() - lastCompletionTime.get();
+ /**
+ * Retrieves the length of time in milliseconds that this client connection has been idle. <BR>
+ * <BR>
+ * Note that the default implementation will always return zero. Subclasses associated with connection handlers
+ * should override this method if they wish to provided idle time limit functionality.
+ *
+ * @return The length of time in milliseconds that this client connection has been idle.
+ */
+ @Override
+ public long getIdleTime() {
+ if (operationsInProgress.isEmpty() && getPersistentSearches().isEmpty()) {
+ return TimeThread.getTime() - lastCompletionTime.get();
+ } else {
+ // There's at least one operation in progress, so it's not idle.
+ return 0L;
+ }
}
- else
- {
- // There's at least one operation in progress, so it's not idle.
- return 0L;
+
+ /**
+ * Return the certificate chain array associated with a connection.
+ *
+ * @return The array of certificates associated with a connection.
+ */
+ public Certificate[] getClientCertificateChain() {
+ return new Certificate[0];
}
- }
- /**
- * Return the certificate chain array associated with a connection.
- *
- * @return The array of certificates associated with a connection.
- */
- public Certificate[] getClientCertificateChain()
- {
- return new Certificate[0];
- }
-
- @Override
- public int getSSF()
- {
- return 0;
- }
+ @Override
+ public int getSSF() {
+ return 0;
+ }
}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index 5e71d6c..753c8da 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -92,1025 +92,859 @@
import com.forgerock.reactive.Stream;
/**
- * This class defines a connection handler that will be used for communicating
- * with clients over LDAP. It is actually implemented in two parts: as a
- * connection handler and one or more request handlers. The connection handler
- * is responsible for accepting new connections and registering each of them
- * with a request handler. The request handlers then are responsible for reading
- * requests from the clients and parsing them as operations. A single request
- * handler may be used, but having multiple handlers might provide better
- * performance in a multi-CPU system.
+ * This class defines a connection handler that will be used for communicating with clients over LDAP. It is actually
+ * implemented in two parts: as a connection handler and one or more request handlers. The connection handler is
+ * responsible for accepting new connections and registering each of them with a request handler. The request handlers
+ * then are responsible for reading requests from the clients and parsing them as operations. A single request handler
+ * may be used, but having multiple handlers might provide better performance in a multi-CPU system.
*/
-public final class LDAPConnectionHandler2 extends
- ConnectionHandler<LDAPConnectionHandlerCfg> implements
- ConfigurationChangeListener<LDAPConnectionHandlerCfg>,
- ServerShutdownListener, AlertGenerator
-{
- /** Task run periodically by the connection finalizer. */
- private final class ConnectionFinalizerRunnable implements Runnable
- {
+public final class LDAPConnectionHandler2 extends ConnectionHandler<LDAPConnectionHandlerCfg> implements
+ ConfigurationChangeListener<LDAPConnectionHandlerCfg>, ServerShutdownListener, AlertGenerator {
+ /** Task run periodically by the connection finalizer. */
+ private final class ConnectionFinalizerRunnable implements Runnable {
+ @Override
+ public void run() {
+ if (!connectionFinalizerActiveJobQueue.isEmpty()) {
+ for (Runnable r : connectionFinalizerActiveJobQueue) {
+ r.run();
+ }
+ connectionFinalizerActiveJobQueue.clear();
+ }
+
+ // Switch the lists.
+ synchronized (connectionFinalizerLock) {
+ List<Runnable> tmp = connectionFinalizerActiveJobQueue;
+ connectionFinalizerActiveJobQueue = connectionFinalizerPendingJobQueue;
+ connectionFinalizerPendingJobQueue = tmp;
+ }
+ }
+ }
+
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ /** Default friendly name for the LDAP connection handler. */
+ private static final String DEFAULT_FRIENDLY_NAME = "LDAP Connection Handler";
+
+ /** SSL instance name used in context creation. */
+ private static final String SSL_CONTEXT_INSTANCE_NAME = "TLS";
+
+ private GrizzlyLDAPListener listener;
+
+ /** The current configuration state. */
+ private LDAPConnectionHandlerCfg currentConfig;
+
+ /* Properties that cannot be modified dynamically */
+
+ /** The set of addresses on which to listen for new connections. */
+ private Set<InetSocketAddress> listenAddresses;
+
+ /** The SSL client auth policy used by this connection handler. */
+ private SSLClientAuthPolicy sslClientAuthPolicy;
+
+ /** The backlog that will be used for the accept queue. */
+ private int backlog;
+
+ /** Indicates whether to allow the reuse address socket option. */
+ private boolean allowReuseAddress;
+
+ /** Indicates whether the Directory Server is in the process of shutting down. */
+ private volatile boolean shutdownRequested;
+
+ /* Internal LDAP connection handler state */
+
+ /** Indicates whether this connection handler is enabled. */
+ private boolean enabled;
+
+ /** The set of clients that are explicitly allowed access to the server. */
+ private Collection<AddressMask> allowedClients;
+
+ /** The set of clients that have been explicitly denied access to the server. */
+ private Collection<AddressMask> deniedClients;
+
+ /** The set of listeners for this connection handler. */
+ private List<HostPort> listeners;
+
+ /** The set of statistics collected for this connection handler. */
+ private LDAPStatistics statTracker;
+
+ /** The client connection monitor provider associated with this connection handler. */
+ private ClientConnectionMonitorProvider connMonitor;
+
+ /** The unique name assigned to this connection handler. */
+ private String handlerName;
+
+ /** The protocol used by this connection handler. */
+ private String protocol;
+
+ /** Queueing strategy. */
+ private final QueueingStrategy queueingStrategy;
+
+ /**
+ * The condition variable that will be used by the start method to wait for the socket port to be opened and ready
+ * to process requests before returning.
+ */
+ private final Object waitListen = new Object();
+
+ /** The friendly name of this connection handler. */
+ private String friendlyName;
+
+ /**
+ * SSL context.
+ *
+ * @see LDAPConnectionHandler2#sslEngine
+ */
+ private SSLContext sslContext;
+
+ /** The SSL engine is used for obtaining default SSL parameters. */
+ private SSLEngine sslEngine;
+
+ /**
+ * Connection finalizer thread.
+ * <p>
+ * This thread is defers closing clients for approximately 100ms. This gives the client a chance to close the
+ * connection themselves before the server thus avoiding leaving the server side in the TIME WAIT state.
+ */
+ private final Object connectionFinalizerLock = new Object();
+ private ScheduledExecutorService connectionFinalizer;
+ private List<Runnable> connectionFinalizerActiveJobQueue;
+ private List<Runnable> connectionFinalizerPendingJobQueue;
+
+ private final List<ClientConnection> connectionList = Collections
+ .synchronizedList(new ArrayList<ClientConnection>());
+
+ /**
+ * Creates a new instance of this LDAP connection handler. It must be initialized before it may be used.
+ */
+ public LDAPConnectionHandler2() {
+ this(new WorkQueueStrategy(), null); // Use name from configuration.
+ }
+
+ /**
+ * Creates a new instance of this LDAP connection handler, using a queueing strategy. It must be initialized before
+ * it may be used.
+ *
+ * @param strategy
+ * Request handling strategy.
+ * @param friendlyName
+ * The name of of this connection handler, or {@code null} if the name should be taken from the
+ * configuration.
+ */
+ public LDAPConnectionHandler2(QueueingStrategy strategy, String friendlyName) {
+ super(friendlyName != null ? friendlyName : DEFAULT_FRIENDLY_NAME + " Thread");
+
+ this.friendlyName = friendlyName;
+ this.queueingStrategy = strategy;
+ }
+
+ /**
+ * Indicates whether this connection handler should allow interaction with LDAPv2 clients.
+ *
+ * @return <CODE>true</CODE> if LDAPv2 is allowed, or <CODE>false</CODE> if not.
+ */
+ public boolean allowLDAPv2() {
+ return currentConfig.isAllowLDAPV2();
+ }
+
+ /**
+ * Indicates whether this connection handler should allow the use of the StartTLS extended operation.
+ *
+ * @return <CODE>true</CODE> if StartTLS is allowed, or <CODE>false</CODE> if not.
+ */
+ public boolean allowStartTLS() {
+ return currentConfig.isAllowStartTLS() && !currentConfig.isUseSSL();
+ }
+
@Override
- public void run()
- {
- if (!connectionFinalizerActiveJobQueue.isEmpty())
- {
- for (Runnable r : connectionFinalizerActiveJobQueue)
- {
- r.run();
+ public ConfigChangeResult applyConfigurationChange(LDAPConnectionHandlerCfg config) {
+ final ConfigChangeResult ccr = new ConfigChangeResult();
+
+ // Note that the following properties cannot be modified:
+ // * listen port and addresses
+ // * use ssl
+ // * ssl policy
+ // * ssl cert nickname
+ // * accept backlog
+ // * tcp reuse address
+ // * num request handler
+
+ // Clear the stat tracker if LDAPv2 is being enabled.
+ if (currentConfig.isAllowLDAPV2() != config.isAllowLDAPV2() && config.isAllowLDAPV2()) {
+ statTracker.clearStatistics();
}
- connectionFinalizerActiveJobQueue.clear();
- }
- // Switch the lists.
- synchronized (connectionFinalizerLock)
- {
- List<Runnable> tmp = connectionFinalizerActiveJobQueue;
- connectionFinalizerActiveJobQueue = connectionFinalizerPendingJobQueue;
- connectionFinalizerPendingJobQueue = tmp;
- }
- }
- }
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+ // Apply the changes.
+ currentConfig = config;
+ enabled = config.isEnabled();
+ allowedClients = config.getAllowedClient();
+ deniedClients = config.getDeniedClient();
- /** Default friendly name for the LDAP connection handler. */
- private static final String DEFAULT_FRIENDLY_NAME = "LDAP Connection Handler";
+ // Reconfigure SSL if needed.
+ try {
+ configureSSL(config);
+ } catch (DirectoryException e) {
+ logger.traceException(e);
+ ccr.setResultCode(e.getResultCode());
+ ccr.addMessage(e.getMessageObject());
+ return ccr;
+ }
- /** SSL instance name used in context creation. */
- private static final String SSL_CONTEXT_INSTANCE_NAME = "TLS";
+ if (config.isAllowLDAPV2()) {
+ DirectoryServer.registerSupportedLDAPVersion(2, this);
+ } else {
+ DirectoryServer.deregisterSupportedLDAPVersion(2, this);
+ }
- private GrizzlyLDAPListener listener;
-
- /** The current configuration state. */
- private LDAPConnectionHandlerCfg currentConfig;
-
- /* Properties that cannot be modified dynamically */
-
- /** The set of addresses on which to listen for new connections. */
- private Set<InetSocketAddress> listenAddresses;
-
- /** The SSL client auth policy used by this connection handler. */
- private SSLClientAuthPolicy sslClientAuthPolicy;
-
- /** The backlog that will be used for the accept queue. */
- private int backlog;
-
- /** Indicates whether to allow the reuse address socket option. */
- private boolean allowReuseAddress;
-
- /** Indicates whether the Directory Server is in the process of shutting down. */
- private volatile boolean shutdownRequested;
-
- /* Internal LDAP connection handler state */
-
- /** Indicates whether this connection handler is enabled. */
- private boolean enabled;
-
- /** The set of clients that are explicitly allowed access to the server. */
- private Collection<AddressMask> allowedClients;
-
- /** The set of clients that have been explicitly denied access to the server. */
- private Collection<AddressMask> deniedClients;
-
- /** The set of listeners for this connection handler. */
- private List<HostPort> listeners;
-
- /** The set of statistics collected for this connection handler. */
- private LDAPStatistics statTracker;
-
- /** The client connection monitor provider associated with this connection handler. */
- private ClientConnectionMonitorProvider connMonitor;
-
- /** The unique name assigned to this connection handler. */
- private String handlerName;
-
- /** The protocol used by this connection handler. */
- private String protocol;
-
- /** Queueing strategy. */
- private final QueueingStrategy queueingStrategy;
-
- /**
- * The condition variable that will be used by the start method to wait for
- * the socket port to be opened and ready to process requests before
- * returning.
- */
- private final Object waitListen = new Object();
-
- /** The friendly name of this connection handler. */
- private String friendlyName;
-
- /**
- * SSL context.
- *
- * @see LDAPConnectionHandler2#sslEngine
- */
- private SSLContext sslContext;
-
- /** The SSL engine is used for obtaining default SSL parameters. */
- private SSLEngine sslEngine;
-
- /**
- * Connection finalizer thread.
- * <p>
- * This thread is defers closing clients for approximately 100ms. This gives
- * the client a chance to close the connection themselves before the server
- * thus avoiding leaving the server side in the TIME WAIT state.
- */
- private final Object connectionFinalizerLock = new Object();
- private ScheduledExecutorService connectionFinalizer;
- private List<Runnable> connectionFinalizerActiveJobQueue;
- private List<Runnable> connectionFinalizerPendingJobQueue;
-
- private final List<ClientConnection> connectionList = Collections.synchronizedList(new ArrayList<ClientConnection>());
-
- /**
- * Creates a new instance of this LDAP connection handler. It must be
- * initialized before it may be used.
- */
- public LDAPConnectionHandler2()
- {
- this(new WorkQueueStrategy(), null); // Use name from configuration.
- }
-
- /**
- * Creates a new instance of this LDAP connection handler, using a queueing
- * strategy. It must be initialized before it may be used.
- *
- * @param strategy
- * Request handling strategy.
- * @param friendlyName
- * The name of of this connection handler, or {@code null} if the
- * name should be taken from the configuration.
- */
- public LDAPConnectionHandler2(QueueingStrategy strategy, String friendlyName)
- {
- super(friendlyName != null ? friendlyName : DEFAULT_FRIENDLY_NAME + " Thread");
-
- this.friendlyName = friendlyName;
- this.queueingStrategy = strategy;
- }
-
- /**
- * Indicates whether this connection handler should allow interaction with
- * LDAPv2 clients.
- *
- * @return <CODE>true</CODE> if LDAPv2 is allowed, or <CODE>false</CODE> if
- * not.
- */
- public boolean allowLDAPv2()
- {
- return currentConfig.isAllowLDAPV2();
- }
-
- /**
- * Indicates whether this connection handler should allow the use of the
- * StartTLS extended operation.
- *
- * @return <CODE>true</CODE> if StartTLS is allowed, or <CODE>false</CODE> if
- * not.
- */
- public boolean allowStartTLS()
- {
- return currentConfig.isAllowStartTLS() && !currentConfig.isUseSSL();
- }
-
- @Override
- public ConfigChangeResult applyConfigurationChange(
- LDAPConnectionHandlerCfg config)
- {
- final ConfigChangeResult ccr = new ConfigChangeResult();
-
- // Note that the following properties cannot be modified:
- // * listen port and addresses
- // * use ssl
- // * ssl policy
- // * ssl cert nickname
- // * accept backlog
- // * tcp reuse address
- // * num request handler
-
- // Clear the stat tracker if LDAPv2 is being enabled.
- if (currentConfig.isAllowLDAPV2() != config.isAllowLDAPV2()
- && config.isAllowLDAPV2())
- {
- statTracker.clearStatistics();
+ return ccr;
}
- // Apply the changes.
- currentConfig = config;
- enabled = config.isEnabled();
- allowedClients = config.getAllowedClient();
- deniedClients = config.getDeniedClient();
-
- // Reconfigure SSL if needed.
- try
- {
- configureSSL(config);
- }
- catch (DirectoryException e)
- {
- logger.traceException(e);
- ccr.setResultCode(e.getResultCode());
- ccr.addMessage(e.getMessageObject());
- return ccr;
+ private void configureSSL(LDAPConnectionHandlerCfg config) throws DirectoryException {
+ protocol = config.isUseSSL() ? "LDAPS" : "LDAP";
+ if (config.isUseSSL() || config.isAllowStartTLS()) {
+ sslContext = createSSLContext(config);
+ sslEngine = createSSLEngine(config, sslContext);
+ } else {
+ sslContext = null;
+ sslEngine = null;
+ }
}
- if (config.isAllowLDAPV2())
- {
- DirectoryServer.registerSupportedLDAPVersion(2, this);
- }
- else
- {
- DirectoryServer.deregisterSupportedLDAPVersion(2, this);
+ @Override
+ public void finalizeConnectionHandler(LocalizableMessage finalizeReason) {
+ shutdownRequested = true;
+ currentConfig.removeLDAPChangeListener(this);
+
+ if (connMonitor != null) {
+ DirectoryServer.deregisterMonitorProvider(connMonitor);
+ }
+
+ if (statTracker != null) {
+ DirectoryServer.deregisterMonitorProvider(statTracker);
+ }
+
+ DirectoryServer.deregisterSupportedLDAPVersion(2, this);
+ DirectoryServer.deregisterSupportedLDAPVersion(3, this);
+
+ // Shutdown the connection finalizer and ensure that any pending
+ // unclosed connections are closed.
+ synchronized (connectionFinalizerLock) {
+ connectionFinalizer.shutdown();
+ connectionFinalizer = null;
+
+ Runnable r = new ConnectionFinalizerRunnable();
+ r.run(); // Flush active queue.
+ r.run(); // Flush pending queue.
+ }
}
- return ccr;
- }
+ /**
+ * Retrieves information about the set of alerts that this generator may produce. The map returned should be between
+ * the notification type for a particular notification and the human-readable description for that notification.
+ * This alert generator must not generate any alerts with types that are not contained in this list.
+ *
+ * @return Information about the set of alerts that this generator may produce.
+ */
+ @Override
+ public Map<String, String> getAlerts() {
+ Map<String, String> alerts = new LinkedHashMap<>();
- private void configureSSL(LDAPConnectionHandlerCfg config)
- throws DirectoryException
- {
- protocol = config.isUseSSL() ? "LDAPS" : "LDAP";
- if (config.isUseSSL() || config.isAllowStartTLS())
- {
- sslContext = createSSLContext(config);
- sslEngine = createSSLEngine(config, sslContext);
- }
- else
- {
- sslContext = null;
- sslEngine = null;
- }
- }
+ alerts.put(ALERT_TYPE_LDAP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES,
+ ALERT_DESCRIPTION_LDAP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES);
+ alerts.put(ALERT_TYPE_LDAP_CONNECTION_HANDLER_UNCAUGHT_ERROR,
+ ALERT_DESCRIPTION_LDAP_CONNECTION_HANDLER_UNCAUGHT_ERROR);
- @Override
- public void finalizeConnectionHandler(LocalizableMessage finalizeReason)
- {
- shutdownRequested = true;
- currentConfig.removeLDAPChangeListener(this);
-
- if (connMonitor != null)
- {
- DirectoryServer.deregisterMonitorProvider(connMonitor);
+ return alerts;
}
- if (statTracker != null)
- {
- DirectoryServer.deregisterMonitorProvider(statTracker);
+ /**
+ * Retrieves the fully-qualified name of the Java class for this alert generator implementation.
+ *
+ * @return The fully-qualified name of the Java class for this alert generator implementation.
+ */
+ @Override
+ public String getClassName() {
+ return LDAPConnectionHandler2.class.getName();
}
- DirectoryServer.deregisterSupportedLDAPVersion(2, this);
- DirectoryServer.deregisterSupportedLDAPVersion(3, this);
-
- // Shutdown the connection finalizer and ensure that any pending
- // unclosed connections are closed.
- synchronized (connectionFinalizerLock)
- {
- connectionFinalizer.shutdown();
- connectionFinalizer = null;
-
- Runnable r = new ConnectionFinalizerRunnable();
- r.run(); // Flush active queue.
- r.run(); // Flush pending queue.
- }
- }
-
- /**
- * Retrieves information about the set of alerts that this generator may
- * produce. The map returned should be between the notification type for a
- * particular notification and the human-readable description for that
- * notification. This alert generator must not generate any alerts with types
- * that are not contained in this list.
- *
- * @return Information about the set of alerts that this generator may
- * produce.
- */
- @Override
- public Map<String, String> getAlerts()
- {
- Map<String, String> alerts = new LinkedHashMap<>();
-
- alerts.put(ALERT_TYPE_LDAP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES,
- ALERT_DESCRIPTION_LDAP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES);
- alerts.put(ALERT_TYPE_LDAP_CONNECTION_HANDLER_UNCAUGHT_ERROR,
- ALERT_DESCRIPTION_LDAP_CONNECTION_HANDLER_UNCAUGHT_ERROR);
-
- return alerts;
- }
-
- /**
- * Retrieves the fully-qualified name of the Java class for this alert
- * generator implementation.
- *
- * @return The fully-qualified name of the Java class for this alert generator
- * implementation.
- */
- @Override
- public String getClassName()
- {
- return LDAPConnectionHandler2.class.getName();
- }
-
- /**
- * Retrieves the set of active client connections that have been established
- * through this connection handler.
- *
- * @return The set of active client connections that have been established
- * through this connection handler.
- */
- @Override
- public Collection<ClientConnection> getClientConnections()
- {
- return connectionList;
- }
-
- /**
- * Retrieves the DN of the configuration entry with which this alert generator
- * is associated.
- *
- * @return The DN of the configuration entry with which this alert generator
- * is associated.
- */
- @Override
- public DN getComponentEntryDN()
- {
- return currentConfig.dn();
- }
-
- @Override
- public String getConnectionHandlerName()
- {
- return handlerName;
- }
-
- @Override
- public Collection<String> getEnabledSSLCipherSuites()
- {
- final SSLEngine engine = sslEngine;
- if (engine != null)
- {
- return Arrays.asList(engine.getEnabledCipherSuites());
- }
- return super.getEnabledSSLCipherSuites();
- }
-
- @Override
- public Collection<String> getEnabledSSLProtocols()
- {
- final SSLEngine engine = sslEngine;
- if (engine != null)
- {
- return Arrays.asList(engine.getEnabledProtocols());
- }
- return super.getEnabledSSLProtocols();
- }
-
- @Override
- public Collection<HostPort> getListeners()
- {
- return listeners;
- }
-
- /**
- * Retrieves the maximum length of time in milliseconds that attempts to write
- * to LDAP client connections should be allowed to block.
- *
- * @return The maximum length of time in milliseconds that attempts to write
- * to LDAP client connections should be allowed to block, or zero if
- * there should not be any limit imposed.
- */
- public long getMaxBlockedWriteTimeLimit()
- {
- return currentConfig.getMaxBlockedWriteTimeLimit();
- }
-
- /**
- * Retrieves the maximum ASN.1 element value length that will be allowed by
- * this connection handler.
- *
- * @return The maximum ASN.1 element value length that will be allowed by this
- * connection handler.
- */
- public int getMaxRequestSize()
- {
- return (int) currentConfig.getMaxRequestSize();
- }
-
- /**
- * Retrieves the size in bytes of the LDAP response message write buffer
- * defined for this connection handler.
- *
- * @return The size in bytes of the LDAP response message write buffer.
- */
- public int getBufferSize()
- {
- return (int) currentConfig.getBufferSize();
- }
-
- @Override
- public String getProtocol()
- {
- return protocol;
- }
-
- @Override
- public String getShutdownListenerName()
- {
- return handlerName;
- }
-
- /**
- * Retrieves the SSL client authentication policy for this connection handler.
- *
- * @return The SSL client authentication policy for this connection handler.
- */
- public SSLClientAuthPolicy getSSLClientAuthPolicy()
- {
- return sslClientAuthPolicy;
- }
-
- /**
- * Retrieves the set of statistics maintained by this connection handler.
- *
- * @return The set of statistics maintained by this connection handler.
- */
- public LDAPStatistics getStatTracker()
- {
- return statTracker;
- }
-
- @Override
- public void initializeConnectionHandler(ServerContext serverContext, LDAPConnectionHandlerCfg config)
- throws ConfigException, InitializationException
- {
- if (friendlyName == null)
- {
- friendlyName = config.dn().rdn().getFirstAVA().getAttributeValue().toString();
+ /**
+ * Retrieves the set of active client connections that have been established through this connection handler.
+ *
+ * @return The set of active client connections that have been established through this connection handler.
+ */
+ @Override
+ public Collection<ClientConnection> getClientConnections() {
+ return connectionList;
}
- // Save this configuration for future reference.
- currentConfig = config;
- enabled = config.isEnabled();
- allowedClients = config.getAllowedClient();
- deniedClients = config.getDeniedClient();
-
- // Configure SSL if needed.
- try
- {
- // This call may disable the connector if wrong SSL settings
- configureSSL(config);
- }
- catch (DirectoryException e)
- {
- logger.traceException(e);
- throw new InitializationException(e.getMessageObject());
+ /**
+ * Retrieves the DN of the configuration entry with which this alert generator is associated.
+ *
+ * @return The DN of the configuration entry with which this alert generator is associated.
+ */
+ @Override
+ public DN getComponentEntryDN() {
+ return currentConfig.dn();
}
- // Save properties that cannot be dynamically modified.
- allowReuseAddress = config.isAllowTCPReuseAddress();
- backlog = config.getAcceptBacklog();
- listenAddresses = new HashSet<>();
- for(InetAddress addr :config.getListenAddress()) {
- listenAddresses.add(new InetSocketAddress(addr, config.getListenPort()));
+ @Override
+ public String getConnectionHandlerName() {
+ return handlerName;
}
- // Construct a unique name for this connection handler, and put
- // together the set of listeners.
- listeners = new LinkedList<>();
- StringBuilder nameBuffer = new StringBuilder();
- nameBuffer.append(friendlyName);
- for (InetSocketAddress a : listenAddresses)
- {
- listeners.add(new HostPort(a.getHostName(), a.getPort()));
- nameBuffer.append(" ");
- nameBuffer.append(a.getHostName());
- }
- nameBuffer.append(" port ");
- nameBuffer.append(config.getListenPort());
- handlerName = nameBuffer.toString();
-
- // Attempt to bind to the listen port on all configured addresses to
- // verify whether the connection handler will be able to start.
- LocalizableMessage errorMessage =
- checkAnyListenAddressInUse(config.getListenAddress(), config.getListenPort(),
- allowReuseAddress, config.dn());
- if (errorMessage != null)
- {
- logger.error(errorMessage);
- throw new InitializationException(errorMessage);
+ @Override
+ public Collection<String> getEnabledSSLCipherSuites() {
+ final SSLEngine engine = sslEngine;
+ if (engine != null) {
+ return Arrays.asList(engine.getEnabledCipherSuites());
+ }
+ return super.getEnabledSSLCipherSuites();
}
- // Create a system property to store the LDAP(S) port the server is
- // listening to. This information can be displayed with jinfo.
- System.setProperty(protocol + "_port", String.valueOf(config.getListenPort()));
-
- // Create and start a connection finalizer thread for this
- // connection handler.
- connectionFinalizer = Executors
- .newSingleThreadScheduledExecutor(new DirectoryThread.Factory(
- "LDAP Connection Finalizer for connection handler " + toString()));
-
- connectionFinalizerActiveJobQueue = new ArrayList<>();
- connectionFinalizerPendingJobQueue = new ArrayList<>();
-
- connectionFinalizer.scheduleWithFixedDelay(
- new ConnectionFinalizerRunnable(), 100, 100, TimeUnit.MILLISECONDS);
-
- // Register the set of supported LDAP versions.
- DirectoryServer.registerSupportedLDAPVersion(3, this);
- if (config.isAllowLDAPV2())
- {
- DirectoryServer.registerSupportedLDAPVersion(2, this);
+ @Override
+ public Collection<String> getEnabledSSLProtocols() {
+ final SSLEngine engine = sslEngine;
+ if (engine != null) {
+ return Arrays.asList(engine.getEnabledProtocols());
+ }
+ return super.getEnabledSSLProtocols();
}
- // Create and register monitors.
- statTracker = new LDAPStatistics(handlerName + " Statistics");
- DirectoryServer.registerMonitorProvider(statTracker);
-
- connMonitor = new ClientConnectionMonitorProvider(this);
- DirectoryServer.registerMonitorProvider(connMonitor);
-
- // Register this as a change listener.
- config.addLDAPChangeListener(this);
- }
-
- @Override
- public boolean isConfigurationAcceptable(ConnectionHandlerCfg configuration,
- List<LocalizableMessage> unacceptableReasons)
- {
- LDAPConnectionHandlerCfg config = (LDAPConnectionHandlerCfg) configuration;
-
- if (currentConfig == null
- || (!currentConfig.isEnabled() && config.isEnabled()))
- {
- // Attempt to bind to the listen port on all configured addresses to
- // verify whether the connection handler will be able to start.
- LocalizableMessage errorMessage =
- checkAnyListenAddressInUse(config.getListenAddress(), config
- .getListenPort(), config.isAllowTCPReuseAddress(), config.dn());
- if (errorMessage != null)
- {
- unacceptableReasons.add(errorMessage);
- return false;
- }
+ @Override
+ public Collection<HostPort> getListeners() {
+ return listeners;
}
- if (config.isEnabled()
+ /**
+ * Retrieves the maximum length of time in milliseconds that attempts to write to LDAP client connections should be
+ * allowed to block.
+ *
+ * @return The maximum length of time in milliseconds that attempts to write to LDAP client connections should be
+ * allowed to block, or zero if there should not be any limit imposed.
+ */
+ public long getMaxBlockedWriteTimeLimit() {
+ return currentConfig.getMaxBlockedWriteTimeLimit();
+ }
+
+ /**
+ * Retrieves the maximum ASN.1 element value length that will be allowed by this connection handler.
+ *
+ * @return The maximum ASN.1 element value length that will be allowed by this connection handler.
+ */
+ public int getMaxRequestSize() {
+ return (int) currentConfig.getMaxRequestSize();
+ }
+
+ /**
+ * Retrieves the size in bytes of the LDAP response message write buffer defined for this connection handler.
+ *
+ * @return The size in bytes of the LDAP response message write buffer.
+ */
+ public int getBufferSize() {
+ return (int) currentConfig.getBufferSize();
+ }
+
+ @Override
+ public String getProtocol() {
+ return protocol;
+ }
+
+ @Override
+ public String getShutdownListenerName() {
+ return handlerName;
+ }
+
+ /**
+ * Retrieves the SSL client authentication policy for this connection handler.
+ *
+ * @return The SSL client authentication policy for this connection handler.
+ */
+ public SSLClientAuthPolicy getSSLClientAuthPolicy() {
+ return sslClientAuthPolicy;
+ }
+
+ /**
+ * Retrieves the set of statistics maintained by this connection handler.
+ *
+ * @return The set of statistics maintained by this connection handler.
+ */
+ public LDAPStatistics getStatTracker() {
+ return statTracker;
+ }
+
+ @Override
+ public void initializeConnectionHandler(ServerContext serverContext, LDAPConnectionHandlerCfg config)
+ throws ConfigException, InitializationException {
+ if (friendlyName == null) {
+ friendlyName = config.dn().rdn().getFirstAVA().getAttributeValue().toString();
+ }
+
+ // Save this configuration for future reference.
+ currentConfig = config;
+ enabled = config.isEnabled();
+ allowedClients = config.getAllowedClient();
+ deniedClients = config.getDeniedClient();
+
+ // Configure SSL if needed.
+ try {
+ // This call may disable the connector if wrong SSL settings
+ configureSSL(config);
+ } catch (DirectoryException e) {
+ logger.traceException(e);
+ throw new InitializationException(e.getMessageObject());
+ }
+
+ // Save properties that cannot be dynamically modified.
+ allowReuseAddress = config.isAllowTCPReuseAddress();
+ backlog = config.getAcceptBacklog();
+ listenAddresses = new HashSet<>();
+ for (InetAddress addr : config.getListenAddress()) {
+ listenAddresses.add(new InetSocketAddress(addr, config.getListenPort()));
+ }
+
+ // Construct a unique name for this connection handler, and put
+ // together the set of listeners.
+ listeners = new LinkedList<>();
+ StringBuilder nameBuffer = new StringBuilder();
+ nameBuffer.append(friendlyName);
+ for (InetSocketAddress a : listenAddresses) {
+ listeners.add(new HostPort(a.getHostName(), a.getPort()));
+ nameBuffer.append(" ");
+ nameBuffer.append(a.getHostName());
+ }
+ nameBuffer.append(" port ");
+ nameBuffer.append(config.getListenPort());
+ handlerName = nameBuffer.toString();
+
+ // Attempt to bind to the listen port on all configured addresses to
+ // verify whether the connection handler will be able to start.
+ LocalizableMessage errorMessage = checkAnyListenAddressInUse(config.getListenAddress(), config.getListenPort(),
+ allowReuseAddress, config.dn());
+ if (errorMessage != null) {
+ logger.error(errorMessage);
+ throw new InitializationException(errorMessage);
+ }
+
+ // Create a system property to store the LDAP(S) port the server is
+ // listening to. This information can be displayed with jinfo.
+ System.setProperty(protocol + "_port", String.valueOf(config.getListenPort()));
+
+ // Create and start a connection finalizer thread for this
+ // connection handler.
+ connectionFinalizer = Executors.newSingleThreadScheduledExecutor(new DirectoryThread.Factory(
+ "LDAP Connection Finalizer for connection handler " + toString()));
+
+ connectionFinalizerActiveJobQueue = new ArrayList<>();
+ connectionFinalizerPendingJobQueue = new ArrayList<>();
+
+ connectionFinalizer.scheduleWithFixedDelay(new ConnectionFinalizerRunnable(), 100, 100, TimeUnit.MILLISECONDS);
+
+ // Register the set of supported LDAP versions.
+ DirectoryServer.registerSupportedLDAPVersion(3, this);
+ if (config.isAllowLDAPV2()) {
+ DirectoryServer.registerSupportedLDAPVersion(2, this);
+ }
+
+ // Create and register monitors.
+ statTracker = new LDAPStatistics(handlerName + " Statistics");
+ DirectoryServer.registerMonitorProvider(statTracker);
+
+ connMonitor = new ClientConnectionMonitorProvider(this);
+ DirectoryServer.registerMonitorProvider(connMonitor);
+
+ // Register this as a change listener.
+ config.addLDAPChangeListener(this);
+ }
+
+ @Override
+ public boolean isConfigurationAcceptable(ConnectionHandlerCfg configuration,
+ List<LocalizableMessage> unacceptableReasons) {
+ LDAPConnectionHandlerCfg config = (LDAPConnectionHandlerCfg) configuration;
+
+ if (currentConfig == null || (!currentConfig.isEnabled() && config.isEnabled())) {
+ // Attempt to bind to the listen port on all configured addresses to
+ // verify whether the connection handler will be able to start.
+ LocalizableMessage errorMessage = checkAnyListenAddressInUse(config.getListenAddress(),
+ config.getListenPort(), config.isAllowTCPReuseAddress(), config.dn());
+ if (errorMessage != null) {
+ unacceptableReasons.add(errorMessage);
+ return false;
+ }
+ }
+
+ if (config.isEnabled()
// Check that the SSL configuration is valid.
- && (config.isUseSSL() || config.isAllowStartTLS()))
- {
- try
- {
- createSSLEngine(config, createSSLContext(config));
- }
- catch (DirectoryException e)
- {
- logger.traceException(e);
+ && (config.isUseSSL() || config.isAllowStartTLS())) {
+ try {
+ createSSLEngine(config, createSSLContext(config));
+ } catch (DirectoryException e) {
+ logger.traceException(e);
- unacceptableReasons.add(e.getMessageObject());
- return false;
- }
- }
-
- return true;
- }
-
- /**
- * Checks whether any listen address is in use for the given port. The check
- * is performed by binding to each address and port.
- *
- * @param listenAddresses
- * the listen {@link InetAddress} to test
- * @param listenPort
- * the listen port to test
- * @param allowReuseAddress
- * whether addresses can be reused
- * @param configEntryDN
- * the configuration entry DN
- * @return an error message if at least one of the address is already in use,
- * null otherwise.
- */
- private LocalizableMessage checkAnyListenAddressInUse(
- Collection<InetAddress> listenAddresses, int listenPort,
- boolean allowReuseAddress, DN configEntryDN)
- {
- for (InetAddress a : listenAddresses)
- {
- try
- {
- if (StaticUtils.isAddressInUse(a, listenPort, allowReuseAddress))
- {
- throw new IOException(ERR_CONNHANDLER_ADDRESS_INUSE.get().toString());
+ unacceptableReasons.add(e.getMessageObject());
+ return false;
+ }
}
- }
- catch (IOException e)
- {
- logger.traceException(e);
- return ERR_CONNHANDLER_CANNOT_BIND.get("LDAP", configEntryDN, a.getHostAddress(), listenPort,
- getExceptionMessage(e));
- }
+
+ return true;
}
- return null;
- }
- @Override
- public boolean isConfigurationChangeAcceptable(
- LDAPConnectionHandlerCfg config, List<LocalizableMessage> unacceptableReasons)
- {
- return isConfigurationAcceptable(config, unacceptableReasons);
- }
-
- @Override
- public void processServerShutdown(LocalizableMessage reason)
- {
- shutdownRequested = true;
- }
-
- void stopListener()
- {
- if (listener != null)
- {
- listener.close();
- listener = null;
+ /**
+ * Checks whether any listen address is in use for the given port. The check is performed by binding to each address
+ * and port.
+ *
+ * @param listenAddresses
+ * the listen {@link InetAddress} to test
+ * @param listenPort
+ * the listen port to test
+ * @param allowReuseAddress
+ * whether addresses can be reused
+ * @param configEntryDN
+ * the configuration entry DN
+ * @return an error message if at least one of the address is already in use, null otherwise.
+ */
+ private LocalizableMessage checkAnyListenAddressInUse(Collection<InetAddress> listenAddresses, int listenPort,
+ boolean allowReuseAddress, DN configEntryDN) {
+ for (InetAddress a : listenAddresses) {
+ try {
+ if (StaticUtils.isAddressInUse(a, listenPort, allowReuseAddress)) {
+ throw new IOException(ERR_CONNHANDLER_ADDRESS_INUSE.get().toString());
+ }
+ } catch (IOException e) {
+ logger.traceException(e);
+ return ERR_CONNHANDLER_CANNOT_BIND.get("LDAP", configEntryDN, a.getHostAddress(), listenPort,
+ getExceptionMessage(e));
+ }
+ }
+ return null;
}
- }
- private void startListener() throws IOException
- {
- listener = new GrizzlyLDAPListener(
- listenAddresses,
- Options.defaultOptions()
- .set(LDAPListener.CONNECT_MAX_BACKLOG, backlog)
- .set(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES, (int) currentConfig.getMaxRequestSize()),
- new Function<LDAPClientContext,
- ReactiveHandler<LDAPClientContext,LdapRawMessage, Stream<Response>>,
- LdapException>() {
+ @Override
+ public boolean isConfigurationChangeAcceptable(LDAPConnectionHandlerCfg config,
+ List<LocalizableMessage> unacceptableReasons) {
+ return isConfigurationAcceptable(config, unacceptableReasons);
+ }
+
+ @Override
+ public void processServerShutdown(LocalizableMessage reason) {
+ shutdownRequested = true;
+ }
+
+ void stopListener() {
+ if (listener != null) {
+ listener.close();
+ listener = null;
+ }
+ }
+
+ private void startListener() throws IOException {
+ listener = new GrizzlyLDAPListener(
+ listenAddresses,
+ Options.defaultOptions().set(LDAPListener.CONNECT_MAX_BACKLOG, backlog)
+ .set(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES, (int) currentConfig.getMaxRequestSize()),
+ new Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+ LdapException>() {
@Override
- public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>
- apply(LDAPClientContext clientContext) throws LdapException {
+ public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> apply(
+ LDAPClientContext clientContext) throws LdapException {
final LDAPClientConnection2 conn = canAccept(clientContext);
return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
@Override
- public Single<Stream<Response>> handle(LDAPClientContext context,
- LdapRawMessage request) throws Exception {
+ public Single<Stream<Response>> handle(LDAPClientContext context, LdapRawMessage request)
+ throws Exception {
return conn.handle(queueingStrategy, request);
}
};
}
});
- }
+ }
- /**
- * Operates in a loop, accepting new connections and ensuring that requests on
- * those connections are handled properly.
- */
- @Override
- public void run()
- {
- setName(handlerName);
- boolean starting = true;
- setName(handlerName);
+ /**
+ * Operates in a loop, accepting new connections and ensuring that requests on those connections are handled
+ * properly.
+ */
+ @Override
+ public void run() {
+ setName(handlerName);
+ boolean starting = true;
+ setName(handlerName);
- boolean lastIterationFailed = false;
+ boolean lastIterationFailed = false;
- while (!shutdownRequested)
- {
- // If this connection handler is not enabled, then just sleep for a bit and check again.
- if (!this.enabled)
- {
- if (listener != null)
- {
- stopListener();
+ while (!shutdownRequested) {
+ // If this connection handler is not enabled, then just sleep for a bit and check again.
+ if (!this.enabled) {
+ if (listener != null) {
+ stopListener();
+ }
+
+ if (starting) {
+ // This may happen if there was an initialisation error which led to disable the connector.
+ // The main thread is waiting for the connector to listen on its port, which will not occur yet,
+ // so notify here to allow the server startup to complete.
+ synchronized (waitListen) {
+ starting = false;
+ waitListen.notify();
+ }
+ }
+
+ StaticUtils.sleep(1000);
+ continue;
+ }
+
+ if (listener != null) {
+ // If already listening, then sleep for a bit and check again.
+ StaticUtils.sleep(1000);
+ continue;
+ }
+
+ try {
+ // At this point, the connection Handler either started correctly or failed
+ // to start but the start process should be notified and resume its work in any cases.
+ synchronized (waitListen) {
+ waitListen.notify();
+ }
+
+ // If we have gotten here, then we are about to start listening
+ // for the first time since startup or since we were previously disabled.
+ // Start the embedded HTTP server
+ startListener();
+ lastIterationFailed = false;
+ } catch (Exception e) {
+ // Clean up the messed up HTTP server
+ stopListener();
+
+ // Error + alert about the horked config
+ logger.traceException(e);
+ logger.error(ERR_CONNHANDLER_CANNOT_ACCEPT_CONNECTION, friendlyName, currentConfig.dn(),
+ getExceptionMessage(e));
+
+ if (lastIterationFailed) {
+ // The last time through the accept loop we also encountered a failure.
+ // Rather than enter a potential infinite loop of failures,
+ // disable this acceptor and log an error.
+ LocalizableMessage message = ERR_CONNHANDLER_CONSECUTIVE_ACCEPT_FAILURES.get(friendlyName,
+ currentConfig.dn(), stackTraceToSingleLineString(e));
+ logger.error(message);
+
+ DirectoryServer.sendAlertNotification(this,
+ ALERT_TYPE_HTTP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES, message);
+ this.enabled = false;
+ } else {
+ lastIterationFailed = true;
+ }
+ }
}
- if (starting)
- {
- // This may happen if there was an initialisation error which led to disable the connector.
- // The main thread is waiting for the connector to listen on its port, which will not occur yet,
- // so notify here to allow the server startup to complete.
- synchronized (waitListen)
- {
- starting = false;
- waitListen.notify();
- }
- }
-
- StaticUtils.sleep(1000);
- continue;
- }
-
- if (listener != null)
- {
- // If already listening, then sleep for a bit and check again.
- StaticUtils.sleep(1000);
- continue;
- }
-
- try
- {
- // At this point, the connection Handler either started correctly or failed
- // to start but the start process should be notified and resume its work in any cases.
- synchronized (waitListen)
- {
- waitListen.notify();
- }
-
- // If we have gotten here, then we are about to start listening
- // for the first time since startup or since we were previously disabled.
- // Start the embedded HTTP server
- startListener();
- lastIterationFailed = false;
- }
- catch (Exception e)
- {
- // Clean up the messed up HTTP server
+ // Initiate shutdown
stopListener();
+ }
- // Error + alert about the horked config
- logger.traceException(e);
- logger.error(
- ERR_CONNHANDLER_CANNOT_ACCEPT_CONNECTION, friendlyName, currentConfig.dn(), getExceptionMessage(e));
-
- if (lastIterationFailed)
- {
- // The last time through the accept loop we also encountered a failure.
- // Rather than enter a potential infinite loop of failures,
- // disable this acceptor and log an error.
- LocalizableMessage message = ERR_CONNHANDLER_CONSECUTIVE_ACCEPT_FAILURES.get(
- friendlyName, currentConfig.dn(), stackTraceToSingleLineString(e));
- logger.error(message);
-
- DirectoryServer.sendAlertNotification(this, ALERT_TYPE_HTTP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES, message);
- this.enabled = false;
+ private LDAPClientConnection2 canAccept(LDAPClientContext clientContext) throws LdapException {
+ // Check to see if the core server rejected the
+ // connection (e.g., already too many connections
+ // established).
+ final LDAPClientConnection2 clientConnection = new LDAPClientConnection2(this, clientContext, getProtocol(),
+ currentConfig.isKeepStats());
+ if (clientConnection.getConnectionID() < 0) {
+ clientConnection.disconnect(DisconnectReason.ADMIN_LIMIT_EXCEEDED, true,
+ ERR_CONNHANDLER_REJECTED_BY_SERVER.get());
+ throw LdapException.newLdapException(ResultCode.ADMIN_LIMIT_EXCEEDED);
}
- else
- {
- lastIterationFailed = true;
+
+ InetAddress clientAddr = clientConnection.getRemoteAddress();
+ // Check to see if the client is on the denied list.
+ // If so, then reject it immediately.
+ if (!deniedClients.isEmpty() && AddressMask.matchesAny(deniedClients, clientAddr)) {
+ clientConnection.disconnect(
+ DisconnectReason.CONNECTION_REJECTED,
+ currentConfig.isSendRejectionNotice(),
+ ERR_CONNHANDLER_DENIED_CLIENT.get(clientConnection.getClientHostPort(),
+ clientConnection.getServerHostPort()));
+ throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
}
- }
- }
+ // Check to see if there is an allowed list and if
+ // there is whether the client is on that list. If
+ // not, then reject the connection.
+ if (!allowedClients.isEmpty() && !AddressMask.matchesAny(allowedClients, clientAddr)) {
+ clientConnection.disconnect(
+ DisconnectReason.CONNECTION_REJECTED,
+ currentConfig.isSendRejectionNotice(),
+ ERR_CONNHANDLER_DISALLOWED_CLIENT.get(clientConnection.getClientHostPort(),
+ clientConnection.getServerHostPort()));
+ throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
+ }
- // Initiate shutdown
- stopListener();
- }
-
- private LDAPClientConnection2 canAccept(LDAPClientContext clientContext) throws LdapException
- {
- // Check to see if the core server rejected the
- // connection (e.g., already too many connections
- // established).
- final LDAPClientConnection2 clientConnection =
- new LDAPClientConnection2(this, clientContext, getProtocol(), currentConfig.isKeepStats());
- if (clientConnection.getConnectionID() < 0)
- {
- clientConnection.disconnect(
- DisconnectReason.ADMIN_LIMIT_EXCEEDED, true, ERR_CONNHANDLER_REJECTED_BY_SERVER.get());
- throw LdapException.newLdapException(ResultCode.ADMIN_LIMIT_EXCEEDED);
- }
-
- InetAddress clientAddr = clientConnection.getRemoteAddress();
- // Check to see if the client is on the denied list.
- // If so, then reject it immediately.
- if (!deniedClients.isEmpty()
- && AddressMask.matchesAny(deniedClients, clientAddr))
- {
- clientConnection.disconnect(DisconnectReason.CONNECTION_REJECTED,
- currentConfig.isSendRejectionNotice(), ERR_CONNHANDLER_DENIED_CLIENT
- .get(clientConnection.getClientHostPort(), clientConnection
- .getServerHostPort()));
- throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
- }
- // Check to see if there is an allowed list and if
- // there is whether the client is on that list. If
- // not, then reject the connection.
- if (!allowedClients.isEmpty()
- && !AddressMask.matchesAny(allowedClients, clientAddr))
- {
- clientConnection.disconnect(DisconnectReason.CONNECTION_REJECTED,
- currentConfig.isSendRejectionNotice(),
- ERR_CONNHANDLER_DISALLOWED_CLIENT.get(clientConnection
- .getClientHostPort(), clientConnection.getServerHostPort()));
- throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
- }
-
- // If we've gotten here, then we'll take the
- // connection so invoke the post-connect plugins and
- // register the client connection with a request
- // handler.
- try
- {
- PluginConfigManager pluginManager = DirectoryServer
- .getPluginConfigManager();
- PluginResult.PostConnect pluginResult = pluginManager
- .invokePostConnectPlugins(clientConnection);
- if (!pluginResult.continueProcessing())
- {
- clientConnection.disconnect(pluginResult.getDisconnectReason(),
- pluginResult.sendDisconnectNotification(),
- pluginResult.getErrorMessage());
- throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- LocalizableMessage message =
- INFO_CONNHANDLER_UNABLE_TO_REGISTER_CLIENT.get(clientConnection
- .getClientHostPort(), clientConnection.getServerHostPort(),
- getExceptionMessage(e));
- logger.debug(message);
-
- clientConnection.disconnect(DisconnectReason.SERVER_ERROR,
- currentConfig.isSendRejectionNotice(), message);
- throw LdapException.newLdapException(ResultCode.OPERATIONS_ERROR);
- }
-
- if (useSSL()) {
+ // If we've gotten here, then we'll take the
+ // connection so invoke the post-connect plugins and
+ // register the client connection with a request
+ // handler.
try {
- clientContext.enableTLS(createSSLEngine());
- } catch (DirectoryException e) {
- throw LdapException.newLdapException(e.getResultCode(), e);
+ PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
+ PluginResult.PostConnect pluginResult = pluginManager.invokePostConnectPlugins(clientConnection);
+ if (!pluginResult.continueProcessing()) {
+ clientConnection.disconnect(pluginResult.getDisconnectReason(),
+ pluginResult.sendDisconnectNotification(), pluginResult.getErrorMessage());
+ throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+
+ LocalizableMessage message = INFO_CONNHANDLER_UNABLE_TO_REGISTER_CLIENT.get(
+ clientConnection.getClientHostPort(), clientConnection.getServerHostPort(), getExceptionMessage(e));
+ logger.debug(message);
+
+ clientConnection.disconnect(DisconnectReason.SERVER_ERROR, currentConfig.isSendRejectionNotice(), message);
+ throw LdapException.newLdapException(ResultCode.OPERATIONS_ERROR);
+ }
+
+ if (useSSL()) {
+ try {
+ clientContext.enableTLS(createSSLEngine());
+ } catch (DirectoryException e) {
+ throw LdapException.newLdapException(e.getResultCode(), e);
+ }
+ }
+
+ return clientConnection;
+ }
+
+ /**
+ * Appends a string representation of this connection handler to the provided buffer.
+ *
+ * @param buffer
+ * The buffer to which the information should be appended.
+ */
+ @Override
+ public void toString(StringBuilder buffer) {
+ buffer.append(handlerName);
+ }
+
+ /**
+ * Indicates whether this connection handler should use SSL to communicate with clients.
+ *
+ * @return {@code true} if this connection handler should use SSL to communicate with clients, or {@code false} if
+ * not.
+ */
+ public boolean useSSL() {
+ return currentConfig.isUseSSL();
+ }
+
+ SSLEngine createSSLEngine() throws DirectoryException {
+ return createSSLEngine(currentConfig, sslContext);
+ }
+
+ private SSLEngine createSSLEngine(LDAPConnectionHandlerCfg config, SSLContext sslContext)
+ throws DirectoryException {
+ try {
+ SSLEngine sslEngine = sslContext.createSSLEngine();
+ sslEngine.setUseClientMode(false);
+
+ final Set<String> protocols = config.getSSLProtocol();
+ if (!protocols.isEmpty()) {
+ sslEngine.setEnabledProtocols(protocols.toArray(new String[0]));
+ }
+
+ final Set<String> ciphers = config.getSSLCipherSuite();
+ if (!ciphers.isEmpty()) {
+ sslEngine.setEnabledCipherSuites(ciphers.toArray(new String[0]));
+ }
+
+ switch (config.getSSLClientAuthPolicy()) {
+ case DISABLED:
+ sslEngine.setNeedClientAuth(false);
+ sslEngine.setWantClientAuth(false);
+ break;
+ case REQUIRED:
+ sslEngine.setWantClientAuth(true);
+ sslEngine.setNeedClientAuth(true);
+ break;
+ case OPTIONAL:
+ default:
+ sslEngine.setNeedClientAuth(false);
+ sslEngine.setWantClientAuth(true);
+ break;
+ }
+
+ return sslEngine;
+ } catch (Exception e) {
+ logger.traceException(e);
+ ResultCode resCode = DirectoryServer.getServerErrorResultCode();
+ LocalizableMessage message = ERR_CONNHANDLER_SSL_CANNOT_INITIALIZE.get(getExceptionMessage(e));
+ throw new DirectoryException(resCode, message, e);
}
}
- return clientConnection;
- }
-
- /**
- * Appends a string representation of this connection handler to the provided
- * buffer.
- *
- * @param buffer
- * The buffer to which the information should be appended.
- */
- @Override
- public void toString(StringBuilder buffer)
- {
- buffer.append(handlerName);
- }
-
- /**
- * Indicates whether this connection handler should use SSL to communicate
- * with clients.
- *
- * @return {@code true} if this connection handler should use SSL to
- * communicate with clients, or {@code false} if not.
- */
- public boolean useSSL()
- {
- return currentConfig.isUseSSL();
- }
-
- SSLEngine createSSLEngine() throws DirectoryException {
- return createSSLEngine(currentConfig, sslContext);
- }
-
- private SSLEngine createSSLEngine(LDAPConnectionHandlerCfg config, SSLContext sslContext) throws DirectoryException
- {
- try
- {
- SSLEngine sslEngine = sslContext.createSSLEngine();
- sslEngine.setUseClientMode(false);
-
- final Set<String> protocols = config.getSSLProtocol();
- if (!protocols.isEmpty())
- {
- sslEngine.setEnabledProtocols(protocols.toArray(new String[0]));
- }
-
- final Set<String> ciphers = config.getSSLCipherSuite();
- if (!ciphers.isEmpty())
- {
- sslEngine.setEnabledCipherSuites(ciphers.toArray(new String[0]));
- }
-
- switch (config.getSSLClientAuthPolicy())
- {
- case DISABLED:
- sslEngine.setNeedClientAuth(false);
- sslEngine.setWantClientAuth(false);
- break;
- case REQUIRED:
- sslEngine.setWantClientAuth(true);
- sslEngine.setNeedClientAuth(true);
- break;
- case OPTIONAL:
- default:
- sslEngine.setNeedClientAuth(false);
- sslEngine.setWantClientAuth(true);
- break;
- }
-
- return sslEngine;
- }
- catch (Exception e)
- {
- logger.traceException(e);
- ResultCode resCode = DirectoryServer.getServerErrorResultCode();
- LocalizableMessage message = ERR_CONNHANDLER_SSL_CANNOT_INITIALIZE
- .get(getExceptionMessage(e));
- throw new DirectoryException(resCode, message, e);
- }
- }
-
- private void disableAndWarnIfUseSSL(LDAPConnectionHandlerCfg config)
- {
- if (config.isUseSSL())
- {
- logger.warn(INFO_DISABLE_CONNECTION, friendlyName);
- enabled = false;
- }
- }
-
- private SSLContext createSSLContext(LDAPConnectionHandlerCfg config)
- throws DirectoryException
- {
- try
- {
- DN keyMgrDN = config.getKeyManagerProviderDN();
- KeyManagerProvider<?> keyManagerProvider = DirectoryServer
- .getKeyManagerProvider(keyMgrDN);
- if (keyManagerProvider == null)
- {
- logger.error(ERR_NULL_KEY_PROVIDER_MANAGER, keyMgrDN, friendlyName);
- disableAndWarnIfUseSSL(config);
- keyManagerProvider = new NullKeyManagerProvider();
- // The SSL connection is unusable without a key manager provider
- }
- else if (! keyManagerProvider.containsAtLeastOneKey())
- {
- logger.error(ERR_INVALID_KEYSTORE, friendlyName);
- disableAndWarnIfUseSSL(config);
- }
-
- final SortedSet<String> aliases = new TreeSet<>(config.getSSLCertNickname());
- final KeyManager[] keyManagers;
- if (aliases.isEmpty())
- {
- keyManagers = keyManagerProvider.getKeyManagers();
- }
- else
- {
- final Iterator<String> it = aliases.iterator();
- while (it.hasNext())
- {
- if (!keyManagerProvider.containsKeyWithAlias(it.next()))
- {
- logger.error(ERR_KEYSTORE_DOES_NOT_CONTAIN_ALIAS, aliases, friendlyName);
- it.remove();
- }
+ private void disableAndWarnIfUseSSL(LDAPConnectionHandlerCfg config) {
+ if (config.isUseSSL()) {
+ logger.warn(INFO_DISABLE_CONNECTION, friendlyName);
+ enabled = false;
}
+ }
- if (aliases.isEmpty())
- {
- disableAndWarnIfUseSSL(config);
+ private SSLContext createSSLContext(LDAPConnectionHandlerCfg config) throws DirectoryException {
+ try {
+ DN keyMgrDN = config.getKeyManagerProviderDN();
+ KeyManagerProvider<?> keyManagerProvider = DirectoryServer.getKeyManagerProvider(keyMgrDN);
+ if (keyManagerProvider == null) {
+ logger.error(ERR_NULL_KEY_PROVIDER_MANAGER, keyMgrDN, friendlyName);
+ disableAndWarnIfUseSSL(config);
+ keyManagerProvider = new NullKeyManagerProvider();
+ // The SSL connection is unusable without a key manager provider
+ } else if (!keyManagerProvider.containsAtLeastOneKey()) {
+ logger.error(ERR_INVALID_KEYSTORE, friendlyName);
+ disableAndWarnIfUseSSL(config);
+ }
+
+ final SortedSet<String> aliases = new TreeSet<>(config.getSSLCertNickname());
+ final KeyManager[] keyManagers;
+ if (aliases.isEmpty()) {
+ keyManagers = keyManagerProvider.getKeyManagers();
+ } else {
+ final Iterator<String> it = aliases.iterator();
+ while (it.hasNext()) {
+ if (!keyManagerProvider.containsKeyWithAlias(it.next())) {
+ logger.error(ERR_KEYSTORE_DOES_NOT_CONTAIN_ALIAS, aliases, friendlyName);
+ it.remove();
+ }
+ }
+
+ if (aliases.isEmpty()) {
+ disableAndWarnIfUseSSL(config);
+ }
+ keyManagers = SelectableCertificateKeyManager.wrap(keyManagerProvider.getKeyManagers(), aliases,
+ friendlyName);
+ }
+
+ DN trustMgrDN = config.getTrustManagerProviderDN();
+ TrustManagerProvider<?> trustManagerProvider = DirectoryServer.getTrustManagerProvider(trustMgrDN);
+ if (trustManagerProvider == null) {
+ trustManagerProvider = new NullTrustManagerProvider();
+ }
+
+ SSLContext sslContext = SSLContext.getInstance(SSL_CONTEXT_INSTANCE_NAME);
+ sslContext.init(keyManagers, trustManagerProvider.getTrustManagers(), null);
+ return sslContext;
+ } catch (Exception e) {
+ logger.traceException(e);
+ ResultCode resCode = DirectoryServer.getServerErrorResultCode();
+ LocalizableMessage message = ERR_CONNHANDLER_SSL_CANNOT_INITIALIZE.get(getExceptionMessage(e));
+ throw new DirectoryException(resCode, message, e);
}
- keyManagers = SelectableCertificateKeyManager.wrap(keyManagerProvider.getKeyManagers(), aliases, friendlyName);
- }
-
- DN trustMgrDN = config.getTrustManagerProviderDN();
- TrustManagerProvider<?> trustManagerProvider = DirectoryServer
- .getTrustManagerProvider(trustMgrDN);
- if (trustManagerProvider == null)
- {
- trustManagerProvider = new NullTrustManagerProvider();
- }
-
- SSLContext sslContext = SSLContext.getInstance(SSL_CONTEXT_INSTANCE_NAME);
- sslContext.init(keyManagers, trustManagerProvider.getTrustManagers(),
- null);
- return sslContext;
}
- catch (Exception e)
- {
- logger.traceException(e);
- ResultCode resCode = DirectoryServer.getServerErrorResultCode();
- LocalizableMessage message = ERR_CONNHANDLER_SSL_CANNOT_INITIALIZE
- .get(getExceptionMessage(e));
- throw new DirectoryException(resCode, message, e);
- }
- }
- /**
- * Enqueue a connection finalizer which will be invoked after a short delay.
- *
- * @param r
- * The connection finalizer runnable.
- */
- void registerConnectionFinalizer(Runnable r)
- {
- synchronized (connectionFinalizerLock)
- {
- if (connectionFinalizer != null)
- {
- connectionFinalizerPendingJobQueue.add(r);
- }
- else
- {
- // Already finalized - invoked immediately.
- r.run();
- }
+ /**
+ * Enqueue a connection finalizer which will be invoked after a short delay.
+ *
+ * @param r
+ * The connection finalizer runnable.
+ */
+ void registerConnectionFinalizer(Runnable r) {
+ synchronized (connectionFinalizerLock) {
+ if (connectionFinalizer != null) {
+ connectionFinalizerPendingJobQueue.add(r);
+ } else {
+ // Already finalized - invoked immediately.
+ r.run();
+ }
+ }
}
- }
}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/ProxyToHandler.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/ProxyToHandler.java
deleted file mode 100644
index b468952..0000000
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/ProxyToHandler.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * The contents of this file are subject to the terms of the Common Development and
- * Distribution License (the License). You may not use this file except in compliance with the
- * License.
- *
- * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
- * specific language governing permission and limitations under the License.
- *
- * When distributing Covered Software, include this CDDL Header Notice in each file and include
- * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
- * Header, with the fields enclosed by brackets [] replaced by your own identifying
- * information: "Portions Copyright [year] [name of copyright owner]".
- *
- * Copyright 2016 ForgeRock AS.
- */
-package org.forgerock.opendj.reactive;
-
-import static com.forgerock.reactive.RxJavaStreams.*;
-import static org.forgerock.util.Utils.closeSilently;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.forgerock.opendj.ldap.Connection;
-import org.forgerock.opendj.ldap.IntermediateResponseHandler;
-import org.forgerock.opendj.ldap.LDAPConnectionFactory;
-import org.forgerock.opendj.ldap.LdapException;
-import org.forgerock.opendj.ldap.SearchResultHandler;
-import org.forgerock.opendj.ldap.requests.CompareRequest;
-import org.forgerock.opendj.ldap.requests.ExtendedRequest;
-import org.forgerock.opendj.ldap.requests.Request;
-import org.forgerock.opendj.ldap.requests.SearchRequest;
-import org.forgerock.opendj.ldap.responses.IntermediateResponse;
-import org.forgerock.opendj.ldap.responses.Response;
-import org.forgerock.opendj.ldap.responses.Result;
-import org.forgerock.opendj.ldap.responses.SearchResultEntry;
-import org.forgerock.opendj.ldap.responses.SearchResultReference;
-import org.forgerock.opendj.ldif.ChangeRecord;
-import org.forgerock.util.promise.ExceptionHandler;
-import org.forgerock.util.promise.ResultHandler;
-import org.forgerock.util.promise.RuntimeExceptionHandler;
-
-import com.forgerock.reactive.ReactiveHandler;
-import com.forgerock.reactive.Single;
-import com.forgerock.reactive.Stream;
-
-import io.reactivex.Flowable;
-import io.reactivex.FlowableEmitter;
-import io.reactivex.FlowableEmitter.BackpressureMode;
-import io.reactivex.FlowableOnSubscribe;
-import io.reactivex.functions.Action;
-
-/** Forward {@link Request} to another server reached through the {@link LDAPConnectionFactory}. */
-final class ProxyToHandler implements ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> {
-
- private final LDAPConnectionFactory connectionFactory;
-
- ProxyToHandler(final LDAPConnectionFactory connectionFactory) {
- this.connectionFactory = connectionFactory;
- }
-
- @Override
- public Single<Stream<Response>> handle(final LDAPClientConnection2 context, final Request request) {
- return singleFromPublisher(Flowable.create(new FlowableOnSubscribe<Stream<Response>>() {
- @Override
- public void subscribe(final FlowableEmitter<Stream<Response>> emitter) throws Exception {
- final AtomicReference<Connection> connectionHolder = new AtomicReference<Connection>();
-
- connectionFactory.getConnectionAsync().thenOnResult(new ResultHandler<Connection>() {
- @Override
- public void handleResult(final Connection connection) {
- connectionHolder.set(connection);
- emitter.onNext(executeRequest(connection, request));
- emitter.onComplete();
- }
- }).thenOnException(new ExceptionHandler<LdapException>() {
- @Override
- public void handleException(LdapException exception) {
- emitter.onError(exception);
- }
- }).thenOnRuntimeException(new RuntimeExceptionHandler() {
- @Override
- public void handleRuntimeException(RuntimeException exception) {
- emitter.onError(exception);
- }
- });
- }
- }, BackpressureMode.ERROR));
- }
-
- private Stream<Response> executeRequest(final Connection connection, final Request request) {
- return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
- @Override
- public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
- // add/delete/modifyDN/modifyReq
- final PublisherAdaptor adapter = new PublisherAdaptor(emitter);
- if (request instanceof ChangeRecord) {
- connection.applyChangeAsync((ChangeRecord) request, adapter).thenOnResult(adapter)
- .thenOnException(adapter).thenOnRuntimeException(adapter);
- } else if (request instanceof CompareRequest) {
- connection.compareAsync((CompareRequest) request, adapter).thenOnResult(adapter)
- .thenOnException(adapter).thenOnRuntimeException(adapter);
- } else if (request instanceof ExtendedRequest) {
- connection.compareAsync((CompareRequest) request, adapter).thenOnResult(adapter)
- .thenOnException(adapter).thenOnRuntimeException(adapter);
- } else if (request instanceof SearchRequest) {
- connection.searchAsync((SearchRequest) request, adapter).thenOnResult(adapter)
- .thenOnException(adapter).thenOnRuntimeException(adapter);
- } else {
- emitter.onError(new IllegalArgumentException("Unsupported request type"));
- }
- }
- }, BackpressureMode.ERROR).doAfterTerminate(new Action() {
- @Override
- public void run() throws Exception {
- closeSilently(connection);
- }
- }));
- }
-
- /** Adaptor forwarding events from the SDK handler to the emitter. */
- private final class PublisherAdaptor implements IntermediateResponseHandler, SearchResultHandler,
- ResultHandler<Result>, ExceptionHandler<Exception>, RuntimeExceptionHandler {
-
- private final FlowableEmitter<Response> emitter;
-
- PublisherAdaptor(final FlowableEmitter<Response> emitter) {
- this.emitter = emitter;
- }
-
- private boolean handle(final Response response) {
- emitter.onNext(response);
- return true;
- }
-
- @Override
- public boolean handleEntry(SearchResultEntry entry) {
- return handle(entry);
- }
-
- @Override
- public boolean handleReference(SearchResultReference reference) {
- return handle(reference);
- }
-
- @Override
- public boolean handleIntermediateResponse(IntermediateResponse response) {
- return handle(response);
- }
-
- @Override
- public void handleResult(Result result) {
- if (result != null) {
- handle(result);
- }
- emitter.onComplete();
- }
-
- @Override
- public void handleRuntimeException(RuntimeException exception) {
- emitter.onError(exception);
- }
-
- @Override
- public void handleException(Exception exception) {
- emitter.onError(exception);
- }
- }
-}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Route.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Route.java
deleted file mode 100644
index 3cbe6a0..0000000
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Route.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * The contents of this file are subject to the terms of the Common Development and
- * Distribution License (the License). You may not use this file except in compliance with the
- * License.
- *
- * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
- * specific language governing permission and limitations under the License.
- *
- * When distributing Covered Software, include this CDDL Header Notice in each file and include
- * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
- * Header, with the fields enclosed by brackets [] replaced by your own identifying
- * information: "Portions Copyright [year] [name of copyright owner]".
- *
- * Copyright 2016 ForgeRock AS.
- */
-package org.forgerock.opendj.reactive;
-
-import static org.forgerock.util.Reject.checkNotNull;
-
-import com.forgerock.opendj.util.Predicate;
-import com.forgerock.reactive.ReactiveHandler;
-
-/**
- * Contains a routing predicate and a handler. If the predicate matches, the handler will be invoked.
- *
- * @param <CTX>
- * Type of context in which request are processed
- * @param <REQ>
- * Type of routed request
- * @param <REP>
- * Type of routed response
- */
-public final class Route<CTX, REQ, REP> {
- final Predicate<REQ, CTX> predicate;
- final ReactiveHandler<CTX, REQ, REP> handler;
-
- /**
- * Creates a new route.
- *
- * @param <CTX>
- * Type of context in which request are processed
- * @param <REQ>
- * Type of routed request
- * @param <REP>
- * Type of routed response
- * @param predicate
- * The {@link Predicate} which must be fulfilled to apply this route
- * @param target
- * The {@link ReactiveHandler} to invoke when this route is applied
- * @return a new {@link Route}
- */
- public static <CTX, REQ, REP> Route<CTX, REQ, REP> newRoute(final Predicate<REQ, CTX> predicate,
- final ReactiveHandler<CTX, REQ, REP> target) {
- return new Route<>(predicate, target);
- }
-
- private Route(final Predicate<REQ, CTX> predicate, final ReactiveHandler<CTX, REQ, REP> target) {
- this.predicate = checkNotNull(predicate, "predicate must not be null");
- this.handler = checkNotNull(target, "handler must not be null");
- }
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/core/LdapEndpointConfigManager.java b/opendj-server-legacy/src/main/java/org/opends/server/core/LdapEndpointConfigManager.java
deleted file mode 100644
index 32fdbf8..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/core/LdapEndpointConfigManager.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/*
- * The contents of this file are subject to the terms of the Common Development and
- * Distribution License (the License). You may not use this file except in compliance with the
- * License.
- *
- * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
- * specific language governing permission and limitations under the License.
- *
- * When distributing Covered Software, include this CDDL Header Notice in each file and include
- * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
- * Header, with the fields enclosed by brackets [] replaced by your own identifying
- * information: "Portions Copyright [year] [name of copyright owner]".
- *
- * Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2014-2016 ForgeRock AS.
- */
-package org.opends.server.core;
-
-import static org.forgerock.opendj.ldap.ResultCode.*;
-import static org.opends.messages.ConfigMessages.*;
-import static org.opends.server.core.DirectoryServer.*;
-import static org.opends.server.util.StaticUtils.*;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.forgerock.i18n.LocalizableMessage;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.opendj.config.server.ConfigChangeResult;
-import org.forgerock.opendj.config.server.ConfigException;
-import org.forgerock.opendj.config.server.ConfigurationAddListener;
-import org.forgerock.opendj.config.server.ConfigurationChangeListener;
-import org.forgerock.opendj.config.server.ConfigurationDeleteListener;
-import org.forgerock.opendj.ldap.DN;
-import org.forgerock.opendj.ldap.ResultCode;
-import org.forgerock.opendj.server.config.meta.BackendCfgDefn;
-import org.forgerock.opendj.server.config.server.BackendCfg;
-import org.forgerock.opendj.server.config.server.RootCfg;
-import org.opends.server.api.Backend;
-import org.opends.server.api.BackendInitializationListener;
-import org.opends.server.backends.ConfigurationBackend;
-import org.opends.server.config.ConfigConstants;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
-import org.opends.server.types.InitializationException;
-import org.opends.server.types.WritabilityMode;
-
-/**
- * This class defines a utility that will be used to manage the configuration
- * for the set of backends defined in the Directory Server. It will perform
- * the necessary initialization of those backends when the server is first
- * started, and then will manage any changes to them while the server is
- * running.
- */
-public class LdapEndpointConfigManager implements
- ConfigurationChangeListener<BackendCfg>,
- ConfigurationAddListener<BackendCfg>,
- ConfigurationDeleteListener<BackendCfg>
-{
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- /** The mapping between configuration entry DNs and their corresponding backend implementations. */
- private final ConcurrentHashMap<DN, Backend<? extends BackendCfg>> registeredBackends = new ConcurrentHashMap<>();
- private final ServerContext serverContext;
-
- /**
- * Creates a new instance of this backend config manager.
- *
- * @param serverContext
- * The server context.
- */
- public LdapEndpointConfigManager(ServerContext serverContext)
- {
- this.serverContext = serverContext;
- }
-
- /**
- * Initializes the configuration associated with the Directory Server
- * backends. This should only be called at Directory Server startup.
- *
- * @param backendIDsToStart
- * The list of backendID to start. Everything will be started if empty.
- * @throws ConfigException
- * If a critical configuration problem prevents the backend
- * initialization from succeeding.
- * @throws InitializationException
- * If a problem occurs while initializing the backends that is not
- * related to the server configuration.
- */
- public void initializeBackendConfig(Collection<String> backendIDsToStart)
- throws ConfigException, InitializationException
- {
- initializeConfigurationBackend();
-
- // Register add and delete listeners.
- RootCfg root = serverContext.getRootConfig();
- root.addBackendAddListener(this);
- root.addBackendDeleteListener(this);
-
- // Get the configuration entry that is at the root of all the backends in
- // the server.
- Entry backendRoot;
- try
- {
- DN configEntryDN = DN.valueOf(ConfigConstants.DN_BACKEND_BASE);
- backendRoot = DirectoryServer.getConfigEntry(configEntryDN);
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- LocalizableMessage message =
- ERR_CONFIG_BACKEND_CANNOT_GET_CONFIG_BASE.get(getExceptionMessage(e));
- throw new ConfigException(message, e);
- }
-
-
- // If the configuration root entry is null, then assume it doesn't exist.
- // In that case, then fail. At least that entry must exist in the
- // configuration, even if there are no backends defined below it.
- if (backendRoot == null)
- {
- throw new ConfigException(ERR_CONFIG_BACKEND_BASE_DOES_NOT_EXIST.get());
- }
- initializeBackends(backendIDsToStart, root);
- }
-
- /**
- * Initializes specified backends. If a backend has been already initialized, do nothing.
- * This should only be called at Directory Server startup, after #initializeBackendConfig()
- *
- * @param backendIDsToStart
- * The list of backendID to start. Everything will be started if empty.
- * @param root
- * The configuration of the server's Root backend
- * @throws ConfigException
- * If a critical configuration problem prevents the backend
- * initialization from succeeding.
- */
- public void initializeBackends(Collection<String> backendIDsToStart, RootCfg root) throws ConfigException
- {
- // Initialize existing backends.
- for (String name : root.listBackends())
- {
- // Get the handler's configuration.
- // This will decode and validate its properties.
- final BackendCfg backendCfg = root.getBackend(name);
- final String backendID = backendCfg.getBackendId();
- if (!backendIDsToStart.isEmpty() && !backendIDsToStart.contains(backendID))
- {
- continue;
- }
- if (DirectoryServer.hasBackend(backendID))
- {
- // Skip this backend if it is already initialized and registered as available.
- continue;
- }
-
- // Register as a change listener for this backend so that we can be
- // notified when it is disabled or enabled.
- backendCfg.addChangeListener(this);
-
- final DN backendDN = backendCfg.dn();
- if (!backendCfg.isEnabled())
- {
- logger.debug(INFO_CONFIG_BACKEND_DISABLED, backendDN);
- continue;
- }
-
- // See if the entry contains an attribute that specifies the class name
- // for the backend implementation. If it does, then load it and make
- // sure that it's a valid backend implementation. There is no such
- // attribute, the specified class cannot be loaded, or it does not
- // contain a valid backend implementation, then log an error and skip it.
- String className = backendCfg.getJavaClass();
-
- Backend<? extends BackendCfg> backend;
- try
- {
- backend = loadBackendClass(className).newInstance();
- }
- catch (Exception e)
- {
- logger.traceException(e);
- logger.error(ERR_CONFIG_BACKEND_CANNOT_INSTANTIATE, className, backendDN, stackTraceToSingleLineString(e));
- continue;
- }
-
- initializeBackend(backend, backendCfg);
- }
- }
-
- private void initializeConfigurationBackend() throws InitializationException
- {
- final ConfigurationBackend configBackend =
- new ConfigurationBackend(serverContext, DirectoryServer.getConfigurationHandler());
- initializeBackend(configBackend, configBackend.getBackendCfg());
- }
-
- private void initializeBackend(Backend<? extends BackendCfg> backend, BackendCfg backendCfg)
- {
- ConfigChangeResult ccr = new ConfigChangeResult();
- initializeBackend(backend, backendCfg, ccr);
- for (LocalizableMessage msg : ccr.getMessages())
- {
- logger.error(msg);
- }
- }
-
- private void initializeBackend(Backend<? extends BackendCfg> backend, BackendCfg backendCfg, ConfigChangeResult ccr)
- {
- backend.setBackendID(backendCfg.getBackendId());
- backend.setWritabilityMode(toWritabilityMode(backendCfg.getWritabilityMode()));
-
- if (acquireSharedLock(backend, backendCfg.getBackendId(), ccr) && configureAndOpenBackend(backend, backendCfg, ccr))
- {
- registerBackend(backend, backendCfg, ccr);
- }
- }
-
- /**
- * Acquire a shared lock on this backend. This will prevent operations like LDIF import or restore
- * from occurring while the backend is active.
- */
- private boolean acquireSharedLock(Backend<?> backend, String backendID, final ConfigChangeResult ccr)
- {
- try
- {
- String lockFile = LockFileManager.getBackendLockFileName(backend);
- StringBuilder failureReason = new StringBuilder();
- if (!LockFileManager.acquireSharedLock(lockFile, failureReason))
- {
- cannotAcquireLock(backendID, ccr, failureReason);
- return false;
- }
- return true;
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- cannotAcquireLock(backendID, ccr, stackTraceToSingleLineString(e));
- return false;
- }
- }
-
- private void cannotAcquireLock(String backendID, final ConfigChangeResult ccr, CharSequence failureReason)
- {
- LocalizableMessage message = ERR_CONFIG_BACKEND_CANNOT_ACQUIRE_SHARED_LOCK.get(backendID, failureReason);
- logger.error(message);
-
- // FIXME -- Do we need to send an admin alert?
- ccr.setResultCode(ResultCode.CONSTRAINT_VIOLATION);
- ccr.setAdminActionRequired(true);
- ccr.addMessage(message);
- }
-
- private void releaseSharedLock(Backend<?> backend, String backendID)
- {
- try
- {
- String lockFile = LockFileManager.getBackendLockFileName(backend);
- StringBuilder failureReason = new StringBuilder();
- if (! LockFileManager.releaseLock(lockFile, failureReason))
- {
- logger.warn(WARN_CONFIG_BACKEND_CANNOT_RELEASE_SHARED_LOCK, backendID, failureReason);
- // FIXME -- Do we need to send an admin alert?
- }
- }
- catch (Exception e2)
- {
- logger.traceException(e2);
-
- logger.warn(WARN_CONFIG_BACKEND_CANNOT_RELEASE_SHARED_LOCK, backendID, stackTraceToSingleLineString(e2));
- // FIXME -- Do we need to send an admin alert?
- }
- }
-
- @Override
- public boolean isConfigurationChangeAcceptable(
- BackendCfg configEntry,
- List<LocalizableMessage> unacceptableReason)
- {
- DN backendDN = configEntry.dn();
-
-
- Set<DN> baseDNs = configEntry.getBaseDN();
-
- // See if the backend is registered with the server. If it is, then
- // see what's changed and whether those changes are acceptable.
- Backend<?> backend = registeredBackends.get(backendDN);
- if (backend != null)
- {
- LinkedHashSet<DN> removedDNs = new LinkedHashSet<>(backend.getBaseDNs());
- LinkedHashSet<DN> addedDNs = new LinkedHashSet<>(baseDNs);
- Iterator<DN> iterator = removedDNs.iterator();
- while (iterator.hasNext())
- {
- DN dn = iterator.next();
- if (addedDNs.remove(dn))
- {
- iterator.remove();
- }
- }
-
- // Copy the directory server's base DN registry and make the
- // requested changes to see if it complains.
- BaseDnRegistry reg = DirectoryServer.copyBaseDnRegistry();
- for (DN dn : removedDNs)
- {
- try
- {
- reg.deregisterBaseDN(dn);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- unacceptableReason.add(de.getMessageObject());
- return false;
- }
- }
-
- for (DN dn : addedDNs)
- {
- try
- {
- reg.registerBaseDN(dn, backend, false);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- unacceptableReason.add(de.getMessageObject());
- return false;
- }
- }
- }
- else if (configEntry.isEnabled())
- {
- /*
- * If the backend was not enabled, it has not been registered with directory server, so
- * no listeners will be registered at the lower layers. Verify as it was an add.
- */
- String className = configEntry.getJavaClass();
- try
- {
- Class<Backend<BackendCfg>> backendClass = loadBackendClass(className);
- if (! Backend.class.isAssignableFrom(backendClass))
- {
- unacceptableReason.add(ERR_CONFIG_BACKEND_CLASS_NOT_BACKEND.get(className, backendDN));
- return false;
- }
-
- Backend<BackendCfg> b = backendClass.newInstance();
- if (! b.isConfigurationAcceptable(configEntry, unacceptableReason, serverContext))
- {
- return false;
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
- unacceptableReason.add(
- ERR_CONFIG_BACKEND_CANNOT_INSTANTIATE.get(className, backendDN, stackTraceToSingleLineString(e)));
- return false;
- }
- }
-
- // If we've gotten to this point, then it is acceptable as far as we are
- // concerned. If it is unacceptable according to the configuration for that
- // backend, then the backend itself will need to make that determination.
- return true;
- }
-
- @Override
- public ConfigChangeResult applyConfigurationChange(BackendCfg cfg)
- {
- DN backendDN = cfg.dn();
- Backend<? extends BackendCfg> backend = registeredBackends.get(backendDN);
- final ConfigChangeResult ccr = new ConfigChangeResult();
-
- // See if the entry contains an attribute that indicates whether the
- // backend should be enabled.
- boolean needToEnable = false;
- try
- {
- if (cfg.isEnabled())
- {
- // The backend is marked as enabled. See if that is already true.
- if (backend == null)
- {
- needToEnable = true;
- } // else already enabled, no need to do anything.
- }
- else
- {
- // The backend is marked as disabled. See if that is already true.
- if (backend != null)
- {
- // It isn't disabled, so we will do so now and deregister it from the
- // Directory Server.
- deregisterBackend(backendDN, backend);
-
- backend.finalizeBackend();
-
- releaseSharedLock(backend, backend.getBackendID());
-
- return ccr;
- } // else already disabled, no need to do anything.
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- ccr.setResultCode(DirectoryServer.getServerErrorResultCode());
- ccr.addMessage(ERR_CONFIG_BACKEND_UNABLE_TO_DETERMINE_ENABLED_STATE.get(backendDN,
- stackTraceToSingleLineString(e)));
- return ccr;
- }
-
- // See if the entry contains an attribute that specifies the class name
- // for the backend implementation. If it does, then load it and make sure
- // that it's a valid backend implementation. There is no such attribute,
- // the specified class cannot be loaded, or it does not contain a valid
- // backend implementation, then log an error and skip it.
- String className = cfg.getJavaClass();
-
- // See if this backend is currently active and if so if the name of the class is the same.
- if (backend != null && !className.equals(backend.getClass().getName()))
- {
- // It is not the same. Try to load it and see if it is a valid backend implementation.
- try
- {
- Class<?> backendClass = DirectoryServer.loadClass(className);
- if (Backend.class.isAssignableFrom(backendClass))
- {
- // It appears to be a valid backend class. We'll return that the
- // change is successful, but indicate that some administrative
- // action is required.
- ccr.addMessage(NOTE_CONFIG_BACKEND_ACTION_REQUIRED_TO_CHANGE_CLASS.get(
- backendDN, backend.getClass().getName(), className));
- ccr.setAdminActionRequired(true);
- }
- else
- {
- // It is not a valid backend class. This is an error.
- ccr.setResultCode(ResultCode.CONSTRAINT_VIOLATION);
- ccr.addMessage(ERR_CONFIG_BACKEND_CLASS_NOT_BACKEND.get(className, backendDN));
- }
- return ccr;
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- ccr.setResultCode(DirectoryServer.getServerErrorResultCode());
- ccr.addMessage(ERR_CONFIG_BACKEND_CANNOT_INSTANTIATE.get(
- className, backendDN, stackTraceToSingleLineString(e)));
- return ccr;
- }
- }
-
-
- // If we've gotten here, then that should mean that we need to enable the
- // backend. Try to do so.
- if (needToEnable)
- {
- try
- {
- backend = loadBackendClass(className).newInstance();
- }
- catch (Exception e)
- {
- // It is not a valid backend class. This is an error.
- ccr.setResultCode(ResultCode.CONSTRAINT_VIOLATION);
- ccr.addMessage(ERR_CONFIG_BACKEND_CLASS_NOT_BACKEND.get(className, backendDN));
- return ccr;
- }
-
- initializeBackend(backend, cfg, ccr);
- return ccr;
- }
- else if (ccr.getResultCode() == ResultCode.SUCCESS && backend != null)
- {
- backend.setWritabilityMode(toWritabilityMode(cfg.getWritabilityMode()));
- }
-
- return ccr;
- }
-
- private boolean registerBackend(Backend<? extends BackendCfg> backend, BackendCfg backendCfg, ConfigChangeResult ccr)
- {
- for (BackendInitializationListener listener : getBackendInitializationListeners())
- {
- listener.performBackendPreInitializationProcessing(backend);
- }
-
- try
- {
- DirectoryServer.registerBackend(backend);
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- LocalizableMessage message =
- WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(backendCfg.getBackendId(), getExceptionMessage(e));
- logger.error(message);
-
- // FIXME -- Do we need to send an admin alert?
- ccr.setResultCode(DirectoryServer.getServerErrorResultCode());
- ccr.addMessage(message);
- return false;
- }
-
- for (BackendInitializationListener listener : getBackendInitializationListeners())
- {
- listener.performBackendPostInitializationProcessing(backend);
- }
-
- registeredBackends.put(backendCfg.dn(), backend);
- return true;
- }
-
- @Override
- public boolean isConfigurationAddAcceptable(
- BackendCfg configEntry,
- List<LocalizableMessage> unacceptableReason)
- {
- DN backendDN = configEntry.dn();
-
-
- // See if the entry contains an attribute that specifies the backend ID. If
- // it does not, then skip it.
- String backendID = configEntry.getBackendId();
- if (DirectoryServer.hasBackend(backendID))
- {
- unacceptableReason.add(WARN_CONFIG_BACKEND_DUPLICATE_BACKEND_ID.get(backendDN, backendID));
- return false;
- }
-
-
- // See if the entry contains an attribute that specifies the set of base DNs
- // for the backend. If it does not, then skip it.
- Set<DN> baseList = configEntry.getBaseDN();
- DN[] baseDNs = new DN[baseList.size()];
- baseList.toArray(baseDNs);
-
-
- // See if the entry contains an attribute that specifies the class name
- // for the backend implementation. If it does, then load it and make sure
- // that it's a valid backend implementation. There is no such attribute,
- // the specified class cannot be loaded, or it does not contain a valid
- // backend implementation, then log an error and skip it.
- String className = configEntry.getJavaClass();
-
- Backend<BackendCfg> backend;
- try
- {
- backend = loadBackendClass(className).newInstance();
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- unacceptableReason.add(ERR_CONFIG_BACKEND_CANNOT_INSTANTIATE.get(
- className, backendDN, stackTraceToSingleLineString(e)));
- return false;
- }
-
-
- // Make sure that all of the base DNs are acceptable for use in the server.
- BaseDnRegistry reg = DirectoryServer.copyBaseDnRegistry();
- for (DN baseDN : baseDNs)
- {
- try
- {
- reg.registerBaseDN(baseDN, backend, false);
- }
- catch (DirectoryException de)
- {
- unacceptableReason.add(de.getMessageObject());
- return false;
- }
- catch (Exception e)
- {
- unacceptableReason.add(getExceptionMessage(e));
- return false;
- }
- }
-
- return backend.isConfigurationAcceptable(configEntry, unacceptableReason, serverContext);
- }
-
- @Override
- public ConfigChangeResult applyConfigurationAdd(BackendCfg cfg)
- {
- DN backendDN = cfg.dn();
- final ConfigChangeResult ccr = new ConfigChangeResult();
-
- // Register as a change listener for this backend entry so that we will
- // be notified of any changes that may be made to it.
- cfg.addChangeListener(this);
-
- // See if the entry contains an attribute that indicates whether the backend should be enabled.
- // If it does not, or if it is not set to "true", then skip it.
- if (!cfg.isEnabled())
- {
- // The backend is explicitly disabled. We will log a message to
- // indicate that it won't be enabled and return.
- LocalizableMessage message = INFO_CONFIG_BACKEND_DISABLED.get(backendDN);
- logger.debug(message);
- ccr.addMessage(message);
- return ccr;
- }
-
-
-
- // See if the entry contains an attribute that specifies the backend ID. If
- // it does not, then skip it.
- String backendID = cfg.getBackendId();
- if (DirectoryServer.hasBackend(backendID))
- {
- LocalizableMessage message = WARN_CONFIG_BACKEND_DUPLICATE_BACKEND_ID.get(backendDN, backendID);
- logger.warn(message);
- ccr.addMessage(message);
- return ccr;
- }
-
-
- // See if the entry contains an attribute that specifies the class name
- // for the backend implementation. If it does, then load it and make sure
- // that it's a valid backend implementation. There is no such attribute,
- // the specified class cannot be loaded, or it does not contain a valid
- // backend implementation, then log an error and skip it.
- String className = cfg.getJavaClass();
-
- Backend<? extends BackendCfg> backend;
- try
- {
- backend = loadBackendClass(className).newInstance();
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- ccr.setResultCode(DirectoryServer.getServerErrorResultCode());
- ccr.addMessage(ERR_CONFIG_BACKEND_CANNOT_INSTANTIATE.get(
- className, backendDN, stackTraceToSingleLineString(e)));
- return ccr;
- }
-
- initializeBackend(backend, cfg, ccr);
- return ccr;
- }
-
- private boolean configureAndOpenBackend(Backend<?> backend, BackendCfg cfg, ConfigChangeResult ccr)
- {
- try
- {
- configureAndOpenBackend(backend, cfg);
- return true;
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- ccr.setResultCode(DirectoryServer.getServerErrorResultCode());
- ccr.addMessage(ERR_CONFIG_BACKEND_CANNOT_INITIALIZE.get(
- cfg.getJavaClass(), cfg.dn(), stackTraceToSingleLineString(e)));
-
- releaseSharedLock(backend, cfg.getBackendId());
- return false;
- }
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private void configureAndOpenBackend(Backend backend, BackendCfg cfg) throws ConfigException, InitializationException
- {
- backend.configureBackend(cfg, serverContext);
- backend.openBackend();
- }
-
- @SuppressWarnings("unchecked")
- private Class<Backend<BackendCfg>> loadBackendClass(String className) throws Exception
- {
- return (Class<Backend<BackendCfg>>) DirectoryServer.loadClass(className);
- }
-
- private WritabilityMode toWritabilityMode(BackendCfgDefn.WritabilityMode writabilityMode)
- {
- switch (writabilityMode)
- {
- case DISABLED:
- return WritabilityMode.DISABLED;
- case ENABLED:
- return WritabilityMode.ENABLED;
- case INTERNAL_ONLY:
- return WritabilityMode.INTERNAL_ONLY;
- default:
- return WritabilityMode.ENABLED;
- }
- }
-
- @Override
- public boolean isConfigurationDeleteAcceptable(
- BackendCfg configEntry,
- List<LocalizableMessage> unacceptableReason)
- {
- DN backendDN = configEntry.dn();
-
-
- // See if this backend config manager has a backend registered with the
- // provided DN. If not, then we don't care if the entry is deleted. If we
- // do know about it, then that means that it is enabled and we will not
- // allow removing a backend that is enabled.
- Backend<?> backend = registeredBackends.get(backendDN);
- if (backend == null)
- {
- return true;
- }
-
-
- // See if the backend has any subordinate backends. If so, then it is not
- // acceptable to remove it. Otherwise, it should be fine.
- Backend<?>[] subBackends = backend.getSubordinateBackends();
- if (subBackends != null && subBackends.length != 0)
- {
- unacceptableReason.add(NOTE_CONFIG_BACKEND_CANNOT_REMOVE_BACKEND_WITH_SUBORDINATES.get(backendDN));
- return false;
- }
- return true;
- }
-
- @Override
- public ConfigChangeResult applyConfigurationDelete(BackendCfg configEntry)
- {
- DN backendDN = configEntry.dn();
- final ConfigChangeResult ccr = new ConfigChangeResult();
-
- // See if this backend config manager has a backend registered with the
- // provided DN. If not, then we don't care if the entry is deleted.
- Backend<?> backend = registeredBackends.get(backendDN);
- if (backend == null)
- {
- return ccr;
- }
-
- // See if the backend has any subordinate backends. If so, then it is not
- // acceptable to remove it. Otherwise, it should be fine.
- Backend<?>[] subBackends = backend.getSubordinateBackends();
- if (subBackends != null && subBackends.length > 0)
- {
- ccr.setResultCode(UNWILLING_TO_PERFORM);
- ccr.addMessage(NOTE_CONFIG_BACKEND_CANNOT_REMOVE_BACKEND_WITH_SUBORDINATES.get(backendDN));
- return ccr;
- }
-
- deregisterBackend(backendDN, backend);
-
- try
- {
- backend.finalizeBackend();
- }
- catch (Exception e)
- {
- logger.traceException(e);
- }
-
- configEntry.removeChangeListener(this);
-
- releaseSharedLock(backend, backend.getBackendID());
-
- return ccr;
- }
-
- private void deregisterBackend(DN backendDN, Backend<?> backend)
- {
- for (BackendInitializationListener listener : getBackendInitializationListeners())
- {
- listener.performBackendPreFinalizationProcessing(backend);
- }
-
- registeredBackends.remove(backendDN);
- DirectoryServer.deregisterBackend(backend);
-
- for (BackendInitializationListener listener : getBackendInitializationListeners())
- {
- listener.performBackendPostFinalizationProcessing(backend);
- }
- }
-}
--
Gitblit v1.10.0