From 464f6dd52a1eeeef0f7b38d4dc1840501818a36f Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 3273 ++++++++++++++++++++++++++--------------------------------
1 files changed, 1,472 insertions(+), 1,801 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 0e14bea..96304a7 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -105,1015 +105,848 @@
import io.reactivex.FlowableOnSubscribe;
/**
- * This class defines an LDAP client connection, which is a type of
- * client connection that will be accepted by an instance of the LDAP
- * connection handler and have its requests decoded by an LDAP request
- * handler.
+ * This class defines an LDAP client connection, which is a type of client connection that will be accepted by an
+ * instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
*/
-public final class LDAPClientConnection2 extends ClientConnection implements
- TLSCapableConnection, ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>>
-{
- private static final String REACTIVE_OUT = "reactive.out";
+public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
+ ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
+ private static final String REACTIVE_OUT = "reactive.out";
-/** The tracer object for the debug logger. */
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+ /** The tracer object for the debug logger. */
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
- /** The time that the last operation was completed. */
- private final AtomicLong lastCompletionTime;
- /** The next operation ID that should be used for this connection. */
- private final AtomicLong nextOperationID;
+ /** The time that the last operation was completed. */
+ private final AtomicLong lastCompletionTime;
+ /** The next operation ID that should be used for this connection. */
+ private final AtomicLong nextOperationID;
- /**
- * Indicates whether the Directory Server believes this connection to be valid
- * and available for communication.
- */
- private volatile boolean connectionValid;
+ /**
+ * Indicates whether the Directory Server believes this connection to be valid and available for communication.
+ */
+ private volatile boolean connectionValid;
- /**
- * Indicates whether this connection is about to be closed. This will be used
- * to prevent accepting new requests while a disconnect is in progress.
- */
- private boolean disconnectRequested;
+ /**
+ * Indicates whether this connection is about to be closed. This will be used to prevent accepting new requests
+ * while a disconnect is in progress.
+ */
+ private boolean disconnectRequested;
- /**
- * Indicates whether the connection should keep statistics regarding the
- * operations that it is performing.
- */
- private final boolean keepStats;
+ /**
+ * Indicates whether the connection should keep statistics regarding the operations that it is performing.
+ */
+ private final boolean keepStats;
- /** The set of all operations currently in progress on this connection. */
- private final ConcurrentHashMap<Integer, Operation> operationsInProgress;
+ /** The set of all operations currently in progress on this connection. */
+ private final ConcurrentHashMap<Integer, Operation> operationsInProgress;
- /**
- * The number of operations performed on this connection. Used to compare with
- * the resource limits of the network group.
- */
- private final AtomicLong operationsPerformed;
+ /**
+ * The number of operations performed on this connection. Used to compare with the resource limits of the network
+ * group.
+ */
+ private final AtomicLong operationsPerformed;
- /** The port on the client from which this connection originated. */
- private final int clientPort;
- /** The LDAP version that the client is using to communicate with the server. */
- private int ldapVersion;
- /** The port on the server to which this client has connected. */
- private final int serverPort;
+ /** The port on the client from which this connection originated. */
+ private final int clientPort;
+ /** The LDAP version that the client is using to communicate with the server. */
+ private int ldapVersion;
+ /** The port on the server to which this client has connected. */
+ private final int serverPort;
- /** The reference to the connection handler that accepted this connection. */
- private final LDAPConnectionHandler2 connectionHandler;
- /** The statistics tracker associated with this client connection. */
- private final LDAPStatistics statTracker;
- private final boolean useNanoTime;
+ /** The reference to the connection handler that accepted this connection. */
+ private final LDAPConnectionHandler2 connectionHandler;
+ /** The statistics tracker associated with this client connection. */
+ private final LDAPStatistics statTracker;
+ private final boolean useNanoTime;
- /** The connection ID assigned to this connection. */
- private final long connectionID;
+ /** The connection ID assigned to this connection. */
+ private final long connectionID;
- /** The lock used to provide threadsafe access to the set of operations in progress. */
- private final Object opsInProgressLock;
+ /** The lock used to provide threadsafe access to the set of operations in progress. */
+ private final Object opsInProgressLock;
- /** The socket channel with which this client connection is associated. */
- private final LDAPClientContext clientContext;
+ /** The socket channel with which this client connection is associated. */
+ private final LDAPClientContext clientContext;
- /** The string representation of the address of the client. */
- private final String clientAddress;
- /** The name of the protocol that the client is using to communicate with the server. */
- private final String protocol;
- /** The string representation of the address of the server to which the client has connected. */
- private final String serverAddress;
+ /** The string representation of the address of the client. */
+ private final String clientAddress;
+ /** The name of the protocol that the client is using to communicate with the server. */
+ private final String protocol;
+ /** The string representation of the address of the server to which the client has connected. */
+ private final String serverAddress;
- /**
- * Creates a new LDAP client connection with the provided information.
- *
- * @param connectionHandler
- * The connection handler that accepted this connection.
- * @param clientContext
- * The socket channel that may be used to communicate with
- * the client.
- * @param protocol String representing the protocol (LDAP or LDAP+SSL).
- * @throws LdapException
- * @throws DirectoryException If SSL initialisation fails.
- */
- LDAPClientConnection2(LDAPConnectionHandler2 connectionHandler, LDAPClientContext clientContext, String protocol,
- boolean keepStats)
- {
- this.connectionHandler = connectionHandler;
- this.clientContext = clientContext;
- opsInProgressLock = new Object();
- ldapVersion = 3;
- lastCompletionTime = new AtomicLong(TimeThread.getTime());
- nextOperationID = new AtomicLong(0);
- connectionValid = true;
- disconnectRequested = false;
- operationsInProgress = new ConcurrentHashMap<>();
- operationsPerformed = new AtomicLong(0);
- this.keepStats = keepStats;
- this.protocol = protocol;
+ /**
+ * Creates a new LDAP client connection with the provided information.
+ *
+ * @param connectionHandler
+ * The connection handler that accepted this connection.
+ * @param clientContext
+ * The socket channel that may be used to communicate with the client.
+ * @param protocol
+ * String representing the protocol (LDAP or LDAP+SSL).
+ * @throws LdapException
+ * @throws DirectoryException
+ * If SSL initialisation fails.
+ */
+ LDAPClientConnection2(LDAPConnectionHandler2 connectionHandler, LDAPClientContext clientContext, String protocol,
+ boolean keepStats) {
+ this.connectionHandler = connectionHandler;
+ this.clientContext = clientContext;
+ opsInProgressLock = new Object();
+ ldapVersion = 3;
+ lastCompletionTime = new AtomicLong(TimeThread.getTime());
+ nextOperationID = new AtomicLong(0);
+ connectionValid = true;
+ disconnectRequested = false;
+ operationsInProgress = new ConcurrentHashMap<>();
+ operationsPerformed = new AtomicLong(0);
+ this.keepStats = keepStats;
+ this.protocol = protocol;
- clientAddress = clientContext.getPeerAddress().getAddress().getHostAddress();
- clientPort = clientContext.getPeerAddress().getPort();
- serverAddress = clientContext.getLocalAddress().getAddress().getHostAddress();
- serverPort = clientContext.getLocalAddress().getPort();
+ clientAddress = clientContext.getPeerAddress().getAddress().getHostAddress();
+ clientPort = clientContext.getPeerAddress().getPort();
+ serverAddress = clientContext.getLocalAddress().getAddress().getHostAddress();
+ serverPort = clientContext.getLocalAddress().getPort();
- statTracker = this.connectionHandler.getStatTracker();
- if (keepStats)
- {
- statTracker.updateConnect();
- this.useNanoTime = DirectoryServer.getUseNanoTime();
- }
- else
- {
- this.useNanoTime = false;
- }
-
- connectionID = DirectoryServer.newConnectionAccepted(this);
- clientContext.onDisconnect(new DisconnectListener()
- {
- @Override
- public void exceptionOccurred(LDAPClientContext context, Throwable error)
- {
- if (error instanceof LocalizableException)
- {
- disconnect(DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
- }
- else
- {
- disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
- }
- }
-
- @Override
- public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage)
- {
- disconnect(DisconnectReason.SERVER_ERROR, false, null);
- }
-
- @Override
- public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest)
- {
- disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
- }
- });
- }
-
- /**
- * Retrieves the connection ID assigned to this connection.
- *
- * @return The connection ID assigned to this connection.
- */
- @Override
- public long getConnectionID()
- {
- return connectionID;
- }
-
- /**
- * Retrieves the connection handler that accepted this client
- * connection.
- *
- * @return The connection handler that accepted this client
- * connection.
- */
- @Override
- public ConnectionHandler<?> getConnectionHandler()
- {
- return connectionHandler;
- }
-
- /**
- * Retrieves the socket channel that can be used to communicate with
- * the client.
- *
- * @return The socket channel that can be used to communicate with the
- * client.
- */
- @Override
- public SocketChannel getSocketChannel()
- {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Retrieves the protocol that the client is using to communicate with
- * the Directory Server.
- *
- * @return The protocol that the client is using to communicate with
- * the Directory Server.
- */
- @Override
- public String getProtocol()
- {
- return protocol;
- }
-
- /**
- * Retrieves a string representation of the address of the client.
- *
- * @return A string representation of the address of the client.
- */
- @Override
- public String getClientAddress()
- {
- return clientAddress;
- }
-
- /**
- * Retrieves the port number for this connection on the client system.
- *
- * @return The port number for this connection on the client system.
- */
- @Override
- public int getClientPort()
- {
- return clientPort;
- }
-
- /**
- * Retrieves a string representation of the address on the server to
- * which the client connected.
- *
- * @return A string representation of the address on the server to
- * which the client connected.
- */
- @Override
- public String getServerAddress()
- {
- return serverAddress;
- }
-
- /**
- * Retrieves the port number for this connection on the server system.
- *
- * @return The port number for this connection on the server system.
- */
- @Override
- public int getServerPort()
- {
- return serverPort;
- }
-
- /**
- * Retrieves the <CODE>java.net.InetAddress</CODE> associated with the
- * remote client system.
- *
- * @return The <CODE>java.net.InetAddress</CODE> associated with the
- * remote client system. It may be <CODE>null</CODE> if the
- * client is not connected over an IP-based connection.
- */
- @Override
- public InetAddress getRemoteAddress()
- {
- return clientContext.getPeerAddress().getAddress();
- }
-
- /**
- * Retrieves the <CODE>java.net.InetAddress</CODE> for the Directory
- * Server system to which the client has established the connection.
- *
- * @return The <CODE>java.net.InetAddress</CODE> for the Directory
- * Server system to which the client has established the
- * connection. It may be <CODE>null</CODE> if the client is
- * not connected over an IP-based connection.
- */
- @Override
- public InetAddress getLocalAddress()
- {
- return clientContext.getLocalAddress().getAddress();
- }
-
- @Override
- public boolean isConnectionValid()
- {
- return this.connectionValid;
- }
-
- /**
- * Indicates whether this client connection is currently using a
- * secure mechanism to communicate with the server. Note that this may
- * change over time based on operations performed by the client or
- * server (e.g., it may go from <CODE>false</CODE> to
- * <CODE>true</CODE> if the client uses the StartTLS extended
- * operation).
- *
- * @return <CODE>true</CODE> if the client connection is currently
- * using a secure mechanism to communicate with the server, or
- * <CODE>false</CODE> if not.
- */
- @Override
- public boolean isSecure()
- {
- return false;
- }
-
- /**
- * Sends a response to the client based on the information in the
- * provided operation.
- *
- * @param operation
- * The operation for which to send the response.
- */
- @Override
- public void sendResponse(Operation operation)
- {
- // Since this is the final response for this operation, we can go
- // ahead and remove it from the "operations in progress" list. It
- // can't be canceled after this point, and this will avoid potential
- // race conditions in which the client immediately sends another
- // request with the same message ID as was used for this operation.
-
- if (keepStats) {
- long time;
- if (useNanoTime) {
- time = operation.getProcessingNanoTime();
+ statTracker = this.connectionHandler.getStatTracker();
+ if (keepStats) {
+ statTracker.updateConnect();
+ this.useNanoTime = DirectoryServer.getUseNanoTime();
} else {
- time = operation.getProcessingTime();
+ this.useNanoTime = false;
}
- this.statTracker.updateOperationMonitoringData(
- operation.getOperationType(),
- time);
+
+ connectionID = DirectoryServer.newConnectionAccepted(this);
+ clientContext.onDisconnect(new DisconnectListener() {
+ @Override
+ public void exceptionOccurred(LDAPClientContext context, Throwable error) {
+ if (error instanceof LocalizableException) {
+ disconnect(
+ DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
+ } else {
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
+ }
+ }
+
+ @Override
+ public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
+ String diagnosticMessage) {
+ disconnect(DisconnectReason.SERVER_ERROR, false, null);
+ }
+
+ @Override
+ public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
+ disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
+ }
+ });
}
- // Avoid sending the response if one has already been sent. This may happen
- // if operation processing encounters a run-time exception after sending the
- // response: the worker thread exception handling code will attempt to send
- // an error result to the client indicating that a problem occurred.
- if (removeOperationInProgress(operation.getMessageID()))
- {
- final Response response = operationToResponse(operation);
- final FlowableEmitter<Response> out = getOut(operation);
- if (response != null)
- {
+ /**
+ * Retrieves the connection ID assigned to this connection.
+ *
+ * @return The connection ID assigned to this connection.
+ */
+ @Override
+ public long getConnectionID() {
+ return connectionID;
+ }
+
+ /**
+ * Retrieves the connection handler that accepted this client connection.
+ *
+ * @return The connection handler that accepted this client connection.
+ */
+ @Override
+ public ConnectionHandler<?> getConnectionHandler() {
+ return connectionHandler;
+ }
+
+ /**
+ * Retrieves the socket channel that can be used to communicate with the client.
+ *
+ * @return The socket channel that can be used to communicate with the client.
+ */
+ @Override
+ public SocketChannel getSocketChannel() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Retrieves the protocol that the client is using to communicate with the Directory Server.
+ *
+ * @return The protocol that the client is using to communicate with the Directory Server.
+ */
+ @Override
+ public String getProtocol() {
+ return protocol;
+ }
+
+ /**
+ * Retrieves a string representation of the address of the client.
+ *
+ * @return A string representation of the address of the client.
+ */
+ @Override
+ public String getClientAddress() {
+ return clientAddress;
+ }
+
+ /**
+ * Retrieves the port number for this connection on the client system.
+ *
+ * @return The port number for this connection on the client system.
+ */
+ @Override
+ public int getClientPort() {
+ return clientPort;
+ }
+
+ /**
+ * Retrieves a string representation of the address on the server to which the client connected.
+ *
+ * @return A string representation of the address on the server to which the client connected.
+ */
+ @Override
+ public String getServerAddress() {
+ return serverAddress;
+ }
+
+ /**
+ * Retrieves the port number for this connection on the server system.
+ *
+ * @return The port number for this connection on the server system.
+ */
+ @Override
+ public int getServerPort() {
+ return serverPort;
+ }
+
+ /**
+ * Retrieves the <CODE>java.net.InetAddress</CODE> associated with the remote client system.
+ *
+ * @return The <CODE>java.net.InetAddress</CODE> associated with the remote client system. It may be
+ * <CODE>null</CODE> if the client is not connected over an IP-based connection.
+ */
+ @Override
+ public InetAddress getRemoteAddress() {
+ return clientContext.getPeerAddress().getAddress();
+ }
+
+ /**
+ * Retrieves the <CODE>java.net.InetAddress</CODE> for the Directory Server system to which the client has
+ * established the connection.
+ *
+ * @return The <CODE>java.net.InetAddress</CODE> for the Directory Server system to which the client has established
+ * the connection. It may be <CODE>null</CODE> if the client is not connected over an IP-based connection.
+ */
+ @Override
+ public InetAddress getLocalAddress() {
+ return clientContext.getLocalAddress().getAddress();
+ }
+
+ @Override
+ public boolean isConnectionValid() {
+ return this.connectionValid;
+ }
+
+ /**
+ * Indicates whether this client connection is currently using a secure mechanism to communicate with the server.
+ * Note that this may change over time based on operations performed by the client or server (e.g., it may go from
+ * <CODE>false</CODE> to <CODE>true</CODE> if the client uses the StartTLS extended operation).
+ *
+ * @return <CODE>true</CODE> if the client connection is currently using a secure mechanism to communicate with the
+ * server, or <CODE>false</CODE> if not.
+ */
+ @Override
+ public boolean isSecure() {
+ return false;
+ }
+
+ /**
+ * Sends a response to the client based on the information in the provided operation.
+ *
+ * @param operation
+ * The operation for which to send the response.
+ */
+ @Override
+ public void sendResponse(Operation operation) {
+ // Since this is the final response for this operation, we can go
+ // ahead and remove it from the "operations in progress" list. It
+ // can't be canceled after this point, and this will avoid potential
+ // race conditions in which the client immediately sends another
+ // request with the same message ID as was used for this operation.
+
+ if (keepStats) {
+ long time;
+ if (useNanoTime) {
+ time = operation.getProcessingNanoTime();
+ } else {
+ time = operation.getProcessingTime();
+ }
+ this.statTracker.updateOperationMonitoringData(operation.getOperationType(), time);
+ }
+
+ // Avoid sending the response if one has already been sent. This may happen
+ // if operation processing encounters a run-time exception after sending the
+ // response: the worker thread exception handling code will attempt to send
+ // an error result to the client indicating that a problem occurred.
+ if (removeOperationInProgress(operation.getMessageID())) {
+ final Response response = operationToResponse(operation);
+ final FlowableEmitter<Response> out = getOut(operation);
+ if (response != null) {
+ out.onNext(response);
+ }
+ out.onComplete();
+ }
+ }
+
+ /**
+ * Retrieves an LDAPMessage containing a response generated from the provided operation.
+ *
+ * @param operation
+ * The operation to use to generate the response LDAPMessage.
+ * @return An LDAPMessage containing a response generated from the provided operation.
+ */
+ private Response operationToResponse(Operation operation) {
+ ResultCode resultCode = operation.getResultCode();
+ if (resultCode == null) {
+ // This must mean that the operation has either not yet completed
+ // or that it completed without a result for some reason. In any
+ // case, log a message and set the response to "operations error".
+ logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_NO_RESULT_CODE, operation.getOperationType(),
+ operation.getConnectionID(), operation.getOperationID());
+ resultCode = DirectoryServer.getServerErrorResultCode();
+ }
+
+ LocalizableMessageBuilder errorMessage = operation.getErrorMessage();
+ String matchedDN = operation.getMatchedDN() != null ? operation.getMatchedDN().toString() : null;
+
+ // Referrals are not allowed for LDAPv2 clients.
+ List<String> referralURLs;
+ if (ldapVersion == 2) {
+ referralURLs = null;
+
+ if (resultCode == ResultCode.REFERRAL) {
+ resultCode = ResultCode.CONSTRAINT_VIOLATION;
+ errorMessage.append(ERR_LDAPV2_REFERRAL_RESULT_CHANGED.get());
+ }
+
+ List<String> opReferrals = operation.getReferralURLs();
+ if (opReferrals != null && !opReferrals.isEmpty()) {
+ StringBuilder referralsStr = new StringBuilder();
+ Iterator<String> iterator = opReferrals.iterator();
+ referralsStr.append(iterator.next());
+
+ while (iterator.hasNext()) {
+ referralsStr.append(", ");
+ referralsStr.append(iterator.next());
+ }
+
+ errorMessage.append(ERR_LDAPV2_REFERRALS_OMITTED.get(referralsStr));
+ }
+ } else {
+ referralURLs = operation.getReferralURLs();
+ }
+
+ final Result result;
+ switch (operation.getOperationType()) {
+ case ADD:
+ result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ case BIND:
+ result = Responses.newBindResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN)
+ .setServerSASLCredentials(((BindOperationBasis) operation).getServerSASLCredentials());
+ break;
+ case COMPARE:
+ result = Responses.newCompareResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ case DELETE:
+ result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ case EXTENDED:
+ // If this an LDAPv2 client, then we can't send this.
+ if (ldapVersion == 2) {
+ logger.error(ERR_LDAPV2_SKIPPING_EXTENDED_RESPONSE, getConnectionID(), operation.getOperationID(),
+ operation);
+ return null;
+ }
+
+ ExtendedOperationBasis extOp = (ExtendedOperationBasis) operation;
+ result = Responses.newGenericExtendedResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN).setOID(extOp.getResponseOID()).setValue(extOp.getResponseValue());
+ break;
+ case MODIFY:
+ result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ case MODIFY_DN:
+ result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ case SEARCH:
+ result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+ .setMatchedDN(matchedDN);
+ break;
+ default:
+ // This must be a type of operation that doesn't have a response.
+ // This shouldn't happen, so log a message and return.
+ logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_INVALID_OP, operation.getOperationType(), getConnectionID(),
+ operation.getOperationID(), operation);
+ return null;
+ }
+ if (referralURLs != null) {
+ result.getReferralURIs().addAll(referralURLs);
+ }
+
+ // Controls are not allowed for LDAPv2 clients.
+ if (ldapVersion != 2) {
+ for (Control control : operation.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Sends the provided search result entry to the client.
+ *
+ * @param searchOperation
+ * The search operation with which the entry is associated
+ * @param searchEntry
+ * The search result entry to be sent to the client
+ */
+ @Override
+ public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
+ getEmitter(searchOperation).onNext(toResponse(searchEntry));
+ }
+
+ private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
+ return getOut(searchOperation);
+ }
+
+ private Response toResponse(SearchResultEntry searchEntry) {
+ return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
+ }
+
+ private FlowableEmitter<Response> getOut(Operation operation) {
+ return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
+ }
+
+ /**
+ * Sends the provided search result reference to the client.
+ *
+ * @param searchOperation
+ * The search operation with which the reference is associated.
+ * @param searchReference
+ * The search result reference to be sent to the client.
+ * @return <CODE>true</CODE> if the client is able to accept referrals, or <CODE>false</CODE> if the client cannot
+ * handle referrals and no more attempts should be made to send them for the associated search operation.
+ */
+ @Override
+ public boolean sendSearchReference(SearchOperation searchOperation, SearchResultReference searchReference) {
+ // Make sure this is not an LDAPv2 client. If it is, then they can't
+ // see referrals so we'll not send anything. Also, throw an
+ // exception so that the core server will know not to try sending
+ // any more referrals to this client for the rest of the operation.
+ if (ldapVersion == 2) {
+ logger.error(ERR_LDAPV2_SKIPPING_SEARCH_REFERENCE, getConnectionID(), searchOperation.getOperationID(),
+ searchReference);
+ return false;
+ }
+
+ final FlowableEmitter<Response> out = getOut(searchOperation);
+ out.onNext(Converters.from(searchReference));
+
+ return true;
+ }
+
+ /**
+ * Sends the provided intermediate response message to the client.
+ *
+ * @param intermediateResponse
+ * The intermediate response message to be sent.
+ * @return <CODE>true</CODE> if processing on the associated operation should continue, or <CODE>false</CODE> if
+ * not.
+ */
+ @Override
+ protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
+ final Operation operation = intermediateResponse.getOperation();
+ final FlowableEmitter<Response> out = getOut(operation);
+
+ final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
+ intermediateResponse.getValue());
+ for (Control control : intermediateResponse.getControls()) {
+ response.addControl(Converters.from(control));
+ }
+
out.onNext(response);
- }
- out.onComplete();
- }
- }
- /**
- * Retrieves an LDAPMessage containing a response generated from the
- * provided operation.
- *
- * @param operation
- * The operation to use to generate the response LDAPMessage.
- * @return An LDAPMessage containing a response generated from the
- * provided operation.
- */
- private Response operationToResponse(Operation operation)
- {
- ResultCode resultCode = operation.getResultCode();
- if (resultCode == null)
- {
- // This must mean that the operation has either not yet completed
- // or that it completed without a result for some reason. In any
- // case, log a message and set the response to "operations error".
- logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_NO_RESULT_CODE, operation.getOperationType(),
- operation.getConnectionID(), operation.getOperationID());
- resultCode = DirectoryServer.getServerErrorResultCode();
+ // The only reason we shouldn't continue processing is if the
+ // connection is closed.
+ return connectionValid;
}
- LocalizableMessageBuilder errorMessage = operation.getErrorMessage();
- String matchedDN = operation.getMatchedDN() != null ? operation.getMatchedDN().toString() : null;
-
- // Referrals are not allowed for LDAPv2 clients.
- List<String> referralURLs;
- if (ldapVersion == 2)
- {
- referralURLs = null;
-
- if (resultCode == ResultCode.REFERRAL)
- {
- resultCode = ResultCode.CONSTRAINT_VIOLATION;
- errorMessage.append(ERR_LDAPV2_REFERRAL_RESULT_CHANGED.get());
- }
-
- List<String> opReferrals = operation.getReferralURLs();
- if (opReferrals != null && !opReferrals.isEmpty())
- {
- StringBuilder referralsStr = new StringBuilder();
- Iterator<String> iterator = opReferrals.iterator();
- referralsStr.append(iterator.next());
-
- while (iterator.hasNext())
- {
- referralsStr.append(", ");
- referralsStr.append(iterator.next());
- }
-
- errorMessage.append(ERR_LDAPV2_REFERRALS_OMITTED.get(referralsStr));
- }
- }
- else
- {
- referralURLs = operation.getReferralURLs();
- }
-
- final Result result;
- switch (operation.getOperationType())
- {
- case ADD:
- result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
- break;
- case BIND:
- result = Responses.newBindResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN)
- .setServerSASLCredentials(((BindOperationBasis) operation).getServerSASLCredentials());
- break;
- case COMPARE:
- result = Responses.newCompareResult(resultCode)
- .setDiagnosticMessage(errorMessage.toString())
- .setMatchedDN(matchedDN);
- break;
- case DELETE:
- result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
- break;
- case EXTENDED:
- // If this an LDAPv2 client, then we can't send this.
- if (ldapVersion == 2)
- {
- logger.error(ERR_LDAPV2_SKIPPING_EXTENDED_RESPONSE,
- getConnectionID(), operation.getOperationID(), operation);
- return null;
- }
-
- ExtendedOperationBasis extOp = (ExtendedOperationBasis) operation;
- result = Responses.newGenericExtendedResult(resultCode)
- .setDiagnosticMessage(errorMessage.toString())
- .setMatchedDN(matchedDN)
- .setOID(extOp.getResponseOID()).setValue(extOp.getResponseValue());
- break;
- case MODIFY:
- result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
- break;
- case MODIFY_DN:
- result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
- break;
- case SEARCH:
- result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
- break;
- default:
- // This must be a type of operation that doesn't have a response.
- // This shouldn't happen, so log a message and return.
- logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_INVALID_OP, operation.getOperationType(), getConnectionID(),
- operation.getOperationID(), operation);
- return null;
- }
- if (referralURLs != null)
- {
- result.getReferralURIs().addAll(referralURLs);
- }
-
- // Controls are not allowed for LDAPv2 clients.
- if (ldapVersion != 2)
- {
- for(Control control : operation.getResponseControls()) {
- result.addControl(Converters.from(control));
- }
- }
-
- return result;
- }
-
- /**
- * Sends the provided search result entry to the client.
- *
- * @param searchOperation
- * The search operation with which the entry is associated
- * @param searchEntry
- * The search result entry to be sent to the client
- */
- @Override
- public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry)
- {
- getEmitter(searchOperation).onNext(toResponse(searchEntry));
- }
-
- private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation)
- {
- return getOut(searchOperation);
- }
-
- private Response toResponse(SearchResultEntry searchEntry)
- {
- return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
- }
-
- private FlowableEmitter<Response> getOut(Operation operation)
- {
- return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
- }
-
- /**
- * Sends the provided search result reference to the client.
- *
- * @param searchOperation
- * The search operation with which the reference is
- * associated.
- * @param searchReference
- * The search result reference to be sent to the client.
- * @return <CODE>true</CODE> if the client is able to accept
- * referrals, or <CODE>false</CODE> if the client cannot
- * handle referrals and no more attempts should be made to
- * send them for the associated search operation.
- */
- @Override
- public boolean sendSearchReference(SearchOperation searchOperation, SearchResultReference searchReference)
- {
- // Make sure this is not an LDAPv2 client. If it is, then they can't
- // see referrals so we'll not send anything. Also, throw an
- // exception so that the core server will know not to try sending
- // any more referrals to this client for the rest of the operation.
- if (ldapVersion == 2)
- {
- logger.error(ERR_LDAPV2_SKIPPING_SEARCH_REFERENCE, getConnectionID(),
- searchOperation.getOperationID(), searchReference);
- return false;
- }
-
- final FlowableEmitter<Response> out = getOut(searchOperation);
- out.onNext(Converters.from(searchReference));
-
- return true;
- }
-
- /**
- * Sends the provided intermediate response message to the client.
- *
- * @param intermediateResponse
- * The intermediate response message to be sent.
- * @return <CODE>true</CODE> if processing on the associated operation
- * should continue, or <CODE>false</CODE> if not.
- */
- @Override
- protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse)
- {
- final Operation operation = intermediateResponse.getOperation();
- final FlowableEmitter<Response> out = getOut(operation);
-
- final Response response =
- Responses.newGenericIntermediateResponse(intermediateResponse.getOID(), intermediateResponse.getValue());
- for (Control control : intermediateResponse.getControls())
- {
- response.addControl(Converters.from(control));
- }
-
- out.onNext(response);
-
- // The only reason we shouldn't continue processing is if the
- // connection is closed.
- return connectionValid;
- }
-
- /**
- * Closes the connection to the client, optionally sending it a
- * message indicating the reason for the closure. Note that the
- * ability to send a notice of disconnection may not be available for
- * all protocols or under all circumstances.
- *
- * @param disconnectReason
- * The disconnect reason that provides the generic cause for
- * the disconnect.
- * @param sendNotification
- * Indicates whether to try to provide notification to the
- * client that the connection will be closed.
- * @param message
- * The message to include in the disconnect notification
- * response. It may be <CODE>null</CODE> if no message is to
- * be sent.
- */
- @Override
- public void disconnect(DisconnectReason disconnectReason,
- boolean sendNotification, LocalizableMessage message)
- {
- // Set a flag indicating that the connection is being terminated so
- // that no new requests will be accepted. Also cancel all operations
- // in progress.
- synchronized (opsInProgressLock)
- {
- // If we are already in the middle of a disconnect, then don't
- // do anything.
- if (disconnectRequested)
- {
- return;
- }
-
- disconnectRequested = true;
- }
-
- if (keepStats)
- {
- statTracker.updateDisconnect();
- }
-
- if (connectionID >= 0)
- {
- DirectoryServer.connectionClosed(this);
- }
-
- // Indicate that this connection is no longer valid.
- connectionValid = false;
-
- final LocalizableMessage cancelMessage;
- if (message != null)
- {
- cancelMessage = new LocalizableMessageBuilder()
- .append(disconnectReason.getClosureMessage())
- .append(": ")
- .append(message)
- .toMessage();
- }
- else
- {
- cancelMessage = disconnectReason.getClosureMessage();
- }
- cancelAllOperations(new CancelRequest(true, cancelMessage));
- finalizeConnectionInternal();
-
- // See if we should send a notification to the client. If so, then
- // construct and send a notice of disconnection unsolicited
- // response. Note that we cannot send this notification to an LDAPv2 client.
- if (sendNotification && ldapVersion != 2)
- {
- try
- {
- LocalizableMessage errMsg = message != null ? message : INFO_LDAP_CLIENT_GENERIC_NOTICE_OF_DISCONNECTION.get();
- clientContext.disconnect(ResultCode.valueOf(toResultCode(disconnectReason)), errMsg.toString());
- }
- catch (Exception e)
- {
- // NYI -- Log a message indicating that we couldn't send the
- // notice of disconnection.
- logger.traceException(e);
- }
- }
- else
- {
- clientContext.disconnect();
- }
-
- // NYI -- Deregister the client connection from any server components that
- // might know about it.
-
- logDisconnect(this, disconnectReason, message);
-
- try
- {
- PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
- pluginManager.invokePostDisconnectPlugins(this, disconnectReason, message);
- }
- catch (Exception e)
- {
- logger.traceException(e);
- }
- }
-
- private int toResultCode(DisconnectReason disconnectReason)
- {
- switch (disconnectReason)
- {
- case PROTOCOL_ERROR:
- return LDAPResultCode.PROTOCOL_ERROR;
- case SERVER_SHUTDOWN:
- return LDAPResultCode.UNAVAILABLE;
- case SERVER_ERROR:
- return DirectoryServer.getServerErrorResultCode().intValue();
- case ADMIN_LIMIT_EXCEEDED:
- case IDLE_TIME_LIMIT_EXCEEDED:
- case MAX_REQUEST_SIZE_EXCEEDED:
- case IO_TIMEOUT:
- return LDAPResultCode.ADMIN_LIMIT_EXCEEDED;
- case CONNECTION_REJECTED:
- return LDAPResultCode.CONSTRAINT_VIOLATION;
- case INVALID_CREDENTIALS:
- return LDAPResultCode.INVALID_CREDENTIALS;
- default:
- return LDAPResultCode.OTHER;
- }
- }
-
- /**
- * Retrieves the set of operations in progress for this client
- * connection. This list must not be altered by any caller.
- *
- * @return The set of operations in progress for this client
- * connection.
- */
- @Override
- public Collection<Operation> getOperationsInProgress()
- {
- return operationsInProgress.values();
- }
-
- /**
- * Retrieves the operation in progress with the specified message ID.
- *
- * @param messageID
- * The message ID for the operation to retrieve.
- * @return The operation in progress with the specified message ID, or
- * <CODE>null</CODE> if no such operation could be found.
- */
- @Override
- public Operation getOperationInProgress(int messageID)
- {
- return operationsInProgress.get(messageID);
- }
-
- /**
- * Adds the provided operation to the set of operations in progress
- * for this client connection.
- *
- * @param operation
- * The operation to add to the set of operations in progress
- * for this client connection.
- * @throws DirectoryException
- * If the operation is not added for some reason (e.g., the
- * client already has reached the maximum allowed concurrent
- * requests).
- */
- private void addOperationInProgress(final QueueingStrategy queueingStrategy, Operation operation)
- throws DirectoryException
- {
- int messageID = operation.getMessageID();
-
- // We need to grab a lock to ensure that no one else can add
- // operations to the queue while we are performing some preliminary
- // checks.
- try
- {
- synchronized (opsInProgressLock)
- {
- // If we're already in the process of disconnecting the client,
- // then reject the operation.
- if (disconnectRequested)
- {
- LocalizableMessage message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- message);
- }
-
- // Add the operation to the list of operations in progress for
- // this connection.
- Operation op = operationsInProgress.putIfAbsent(messageID, operation);
-
- // See if there is already an operation in progress with the
- // same message ID. If so, then we can't allow it.
- if (op != null)
- {
- LocalizableMessage message =
- WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
- throw new DirectoryException(ResultCode.PROTOCOL_ERROR,
- message);
- }
- }
-
- // Try to add the operation to the work queue,
- // or run it synchronously (typically for the administration
- // connector)
- queueingStrategy.enqueueRequest(operation);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- operationsInProgress.remove(messageID);
- lastCompletionTime.set(TimeThread.getTime());
-
- throw de;
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- LocalizableMessage message =
- WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
- throw new DirectoryException(DirectoryServer
- .getServerErrorResultCode(), message, e);
- }
- }
-
- /**
- * Removes the provided operation from the set of operations in
- * progress for this client connection. Note that this does not make
- * any attempt to cancel any processing that may already be in
- * progress for the operation.
- *
- * @param messageID
- * The message ID of the operation to remove from the set of
- * operations in progress.
- * @return <CODE>true</CODE> if the operation was found and removed
- * from the set of operations in progress, or
- * <CODE>false</CODE> if not.
- */
- @Override
- public boolean removeOperationInProgress(int messageID)
- {
- Operation operation = operationsInProgress.remove(messageID);
- if (operation == null)
- {
- return false;
- }
-
- if (operation.getOperationType() == OperationType.ABANDON
- && keepStats
- && operation.getResultCode() == ResultCode.CANCELLED)
- {
- statTracker.updateAbandonedOperation();
- }
-
- lastCompletionTime.set(TimeThread.getTime());
- return true;
- }
-
- /**
- * Attempts to cancel the specified operation.
- *
- * @param messageID
- * The message ID of the operation to cancel.
- * @param cancelRequest
- * An object providing additional information about how the
- * cancel should be processed.
- * @return A cancel result that either indicates that the cancel was
- * successful or provides a reason that it was not.
- */
- @Override
- public CancelResult cancelOperation(int messageID,
- CancelRequest cancelRequest)
- {
- Operation op = operationsInProgress.get(messageID);
- if (op != null)
- {
- return op.cancel(cancelRequest);
- }
-
- // See if the operation is in the list of persistent searches.
- for (PersistentSearch ps : getPersistentSearches())
- {
- if (ps.getMessageID() == messageID)
- {
- // We only need to find the first persistent search
- // associated with the provided message ID. The persistent search
- // will ensure that all other related persistent searches are cancelled.
- return ps.cancel();
- }
- }
- return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
- }
-
- /**
- * Attempts to cancel all operations in progress on this connection.
- *
- * @param cancelRequest
- * An object providing additional information about how the
- * cancel should be processed.
- */
- @Override
- public void cancelAllOperations(CancelRequest cancelRequest)
- {
- // Make sure that no one can add any new operations.
- synchronized (opsInProgressLock)
- {
- try
- {
- for (Operation o : operationsInProgress.values())
- {
- try
- {
- o.abort(cancelRequest);
-
- // TODO: Assume its cancelled?
- if (keepStats)
- {
- statTracker.updateAbandonedOperation();
+ /**
+ * Closes the connection to the client, optionally sending it a message indicating the reason for the closure. Note
+ * that the ability to send a notice of disconnection may not be available for all protocols or under all
+ * circumstances.
+ *
+ * @param disconnectReason
+ * The disconnect reason that provides the generic cause for the disconnect.
+ * @param sendNotification
+ * Indicates whether to try to provide notification to the client that the connection will be closed.
+ * @param message
+ * The message to include in the disconnect notification response. It may be <CODE>null</CODE> if no
+ * message is to be sent.
+ */
+ @Override
+ public void disconnect(DisconnectReason disconnectReason, boolean sendNotification, LocalizableMessage message) {
+ // Set a flag indicating that the connection is being terminated so
+ // that no new requests will be accepted. Also cancel all operations
+ // in progress.
+ synchronized (opsInProgressLock) {
+ // If we are already in the middle of a disconnect, then don't
+ // do anything.
+ if (disconnectRequested) {
+ return;
}
- }
- catch (Exception e)
- {
+
+ disconnectRequested = true;
+ }
+
+ if (keepStats) {
+ statTracker.updateDisconnect();
+ }
+
+ if (connectionID >= 0) {
+ DirectoryServer.connectionClosed(this);
+ }
+
+ // Indicate that this connection is no longer valid.
+ connectionValid = false;
+
+ final LocalizableMessage cancelMessage;
+ if (message != null) {
+ cancelMessage = new LocalizableMessageBuilder().append(disconnectReason.getClosureMessage()).append(": ")
+ .append(message).toMessage();
+ } else {
+ cancelMessage = disconnectReason.getClosureMessage();
+ }
+ cancelAllOperations(new CancelRequest(true, cancelMessage));
+ finalizeConnectionInternal();
+
+ // See if we should send a notification to the client. If so, then
+ // construct and send a notice of disconnection unsolicited
+ // response. Note that we cannot send this notification to an LDAPv2 client.
+ if (sendNotification && ldapVersion != 2) {
+ try {
+ LocalizableMessage errMsg = message != null ? message
+ : INFO_LDAP_CLIENT_GENERIC_NOTICE_OF_DISCONNECTION.get();
+ clientContext.disconnect(ResultCode.valueOf(toResultCode(disconnectReason)), errMsg.toString());
+ } catch (Exception e) {
+ // NYI -- Log a message indicating that we couldn't send the
+ // notice of disconnection.
+ logger.traceException(e);
+ }
+ } else {
+ clientContext.disconnect();
+ }
+
+ // NYI -- Deregister the client connection from any server components that
+ // might know about it.
+
+ logDisconnect(this, disconnectReason, message);
+
+ try {
+ PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
+ pluginManager.invokePostDisconnectPlugins(this, disconnectReason, message);
+ } catch (Exception e) {
logger.traceException(e);
- }
}
-
- if (!operationsInProgress.isEmpty()
- || !getPersistentSearches().isEmpty())
- {
- lastCompletionTime.set(TimeThread.getTime());
- }
-
- operationsInProgress.clear();
-
- for (PersistentSearch persistentSearch : getPersistentSearches())
- {
- persistentSearch.cancel();
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
- }
}
- }
- /**
- * Attempts to cancel all operations in progress on this connection
- * except the operation with the specified message ID.
- *
- * @param cancelRequest
- * An object providing additional information about how the
- * cancel should be processed.
- * @param messageID
- * The message ID of the operation that should not be
- * canceled.
- */
- @Override
- public void cancelAllOperationsExcept(CancelRequest cancelRequest,
- int messageID)
- {
- // Make sure that no one can add any new operations.
- synchronized (opsInProgressLock)
- {
- try
- {
- for (int msgID : operationsInProgress.keySet())
- {
- if (msgID == messageID)
- {
- continue;
- }
-
- Operation o = operationsInProgress.get(msgID);
- if (o != null)
- {
- try
- {
- o.abort(cancelRequest);
-
- // TODO: Assume its cancelled?
- if (keepStats)
- {
- statTracker.updateAbandonedOperation();
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
- }
- }
-
- operationsInProgress.remove(msgID);
- lastCompletionTime.set(TimeThread.getTime());
+ private int toResultCode(DisconnectReason disconnectReason) {
+ switch (disconnectReason) {
+ case PROTOCOL_ERROR:
+ return LDAPResultCode.PROTOCOL_ERROR;
+ case SERVER_SHUTDOWN:
+ return LDAPResultCode.UNAVAILABLE;
+ case SERVER_ERROR:
+ return DirectoryServer.getServerErrorResultCode().intValue();
+ case ADMIN_LIMIT_EXCEEDED:
+ case IDLE_TIME_LIMIT_EXCEEDED:
+ case MAX_REQUEST_SIZE_EXCEEDED:
+ case IO_TIMEOUT:
+ return LDAPResultCode.ADMIN_LIMIT_EXCEEDED;
+ case CONNECTION_REJECTED:
+ return LDAPResultCode.CONSTRAINT_VIOLATION;
+ case INVALID_CREDENTIALS:
+ return LDAPResultCode.INVALID_CREDENTIALS;
+ default:
+ return LDAPResultCode.OTHER;
}
-
- for (PersistentSearch persistentSearch : getPersistentSearches())
- {
- if (persistentSearch.getMessageID() == messageID)
- {
- continue;
- }
-
- persistentSearch.cancel();
- lastCompletionTime.set(TimeThread.getTime());
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
- }
}
- }
- @Override
- public Selector getWriteSelector()
- {
- throw new UnsupportedOperationException();
- }
+ /**
+ * Retrieves the set of operations in progress for this client connection. This list must not be altered by any
+ * caller.
+ *
+ * @return The set of operations in progress for this client connection.
+ */
+ @Override
+ public Collection<Operation> getOperationsInProgress() {
+ return operationsInProgress.values();
+ }
- @Override
- public long getMaxBlockedWriteTimeLimit()
- {
- return connectionHandler.getMaxBlockedWriteTimeLimit();
- }
+ /**
+ * Retrieves the operation in progress with the specified message ID.
+ *
+ * @param messageID
+ * The message ID for the operation to retrieve.
+ * @return The operation in progress with the specified message ID, or <CODE>null</CODE> if no such operation could
+ * be found.
+ */
+ @Override
+ public Operation getOperationInProgress(int messageID) {
+ return operationsInProgress.get(messageID);
+ }
- /**
- * Returns the total number of operations initiated on this
- * connection.
- *
- * @return the total number of operations on this connection
- */
- @Override
- public long getNumberOfOperations()
- {
- return operationsPerformed.get();
- }
+ /**
+ * Adds the provided operation to the set of operations in progress for this client connection.
+ *
+ * @param operation
+ * The operation to add to the set of operations in progress for this client connection.
+ * @throws DirectoryException
+ * If the operation is not added for some reason (e.g., the client already has reached the maximum
+ * allowed concurrent requests).
+ */
+ private void addOperationInProgress(final QueueingStrategy queueingStrategy, Operation operation)
+ throws DirectoryException {
+ int messageID = operation.getMessageID();
- /**
- * Processes the provided LDAP message read from the client and takes
- * whatever action is appropriate. For most requests, this will
- * include placing the operation in the work queue. Certain requests
- * (in particular, abandons and unbinds) will be processed directly.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message to process.
- * @return <CODE>true</CODE> if the appropriate action was taken for
- * the request, or <CODE>false</CODE> if there was a fatal
- * error and the client has been disconnected as a result, or
- * if the client unbound from the server.
- */
+ // We need to grab a lock to ensure that no one else can add
+ // operations to the queue while we are performing some preliminary
+ // checks.
+ try {
+ synchronized (opsInProgressLock) {
+ // If we're already in the process of disconnecting the client,
+ // then reject the operation.
+ if (disconnectRequested) {
+ LocalizableMessage message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
+ }
+
+ // Add the operation to the list of operations in progress for
+ // this connection.
+ Operation op = operationsInProgress.putIfAbsent(messageID, operation);
+
+ // See if there is already an operation in progress with the
+ // same message ID. If so, then we can't allow it.
+ if (op != null) {
+ LocalizableMessage message = WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
+ throw new DirectoryException(ResultCode.PROTOCOL_ERROR, message);
+ }
+ }
+
+ // Try to add the operation to the work queue,
+ // or run it synchronously (typically for the administration
+ // connector)
+ queueingStrategy.enqueueRequest(operation);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ operationsInProgress.remove(messageID);
+ lastCompletionTime.set(TimeThread.getTime());
+
+ throw de;
+ } catch (Exception e) {
+ logger.traceException(e);
+
+ LocalizableMessage message = WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
+ throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message, e);
+ }
+ }
+
+ /**
+ * Removes the provided operation from the set of operations in progress for this client connection. Note that this
+ * does not make any attempt to cancel any processing that may already be in progress for the operation.
+ *
+ * @param messageID
+ * The message ID of the operation to remove from the set of operations in progress.
+ * @return <CODE>true</CODE> if the operation was found and removed from the set of operations in progress, or
+ * <CODE>false</CODE> if not.
+ */
+ @Override
+ public boolean removeOperationInProgress(int messageID) {
+ Operation operation = operationsInProgress.remove(messageID);
+ if (operation == null) {
+ return false;
+ }
+
+ if (operation.getOperationType() == OperationType.ABANDON && keepStats
+ && operation.getResultCode() == ResultCode.CANCELLED) {
+ statTracker.updateAbandonedOperation();
+ }
+
+ lastCompletionTime.set(TimeThread.getTime());
+ return true;
+ }
+
+ /**
+ * Attempts to cancel the specified operation.
+ *
+ * @param messageID
+ * The message ID of the operation to cancel.
+ * @param cancelRequest
+ * An object providing additional information about how the cancel should be processed.
+ * @return A cancel result that either indicates that the cancel was successful or provides a reason that it was
+ * not.
+ */
+ @Override
+ public CancelResult cancelOperation(int messageID, CancelRequest cancelRequest) {
+ Operation op = operationsInProgress.get(messageID);
+ if (op != null) {
+ return op.cancel(cancelRequest);
+ }
+
+ // See if the operation is in the list of persistent searches.
+ for (PersistentSearch ps : getPersistentSearches()) {
+ if (ps.getMessageID() == messageID) {
+ // We only need to find the first persistent search
+ // associated with the provided message ID. The persistent search
+ // will ensure that all other related persistent searches are cancelled.
+ return ps.cancel();
+ }
+ }
+ return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
+ }
+
+ /**
+ * Attempts to cancel all operations in progress on this connection.
+ *
+ * @param cancelRequest
+ * An object providing additional information about how the cancel should be processed.
+ */
+ @Override
+ public void cancelAllOperations(CancelRequest cancelRequest) {
+ // Make sure that no one can add any new operations.
+ synchronized (opsInProgressLock) {
+ try {
+ for (Operation o : operationsInProgress.values()) {
+ try {
+ o.abort(cancelRequest);
+
+ // TODO: Assume its cancelled?
+ if (keepStats) {
+ statTracker.updateAbandonedOperation();
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+ }
+ }
+
+ if (!operationsInProgress.isEmpty() || !getPersistentSearches().isEmpty()) {
+ lastCompletionTime.set(TimeThread.getTime());
+ }
+
+ operationsInProgress.clear();
+
+ for (PersistentSearch persistentSearch : getPersistentSearches()) {
+ persistentSearch.cancel();
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+ }
+ }
+ }
+
+ /**
+ * Attempts to cancel all operations in progress on this connection except the operation with the specified message
+ * ID.
+ *
+ * @param cancelRequest
+ * An object providing additional information about how the cancel should be processed.
+ * @param messageID
+ * The message ID of the operation that should not be canceled.
+ */
+ @Override
+ public void cancelAllOperationsExcept(CancelRequest cancelRequest, int messageID) {
+ // Make sure that no one can add any new operations.
+ synchronized (opsInProgressLock) {
+ try {
+ for (int msgID : operationsInProgress.keySet()) {
+ if (msgID == messageID) {
+ continue;
+ }
+
+ Operation o = operationsInProgress.get(msgID);
+ if (o != null) {
+ try {
+ o.abort(cancelRequest);
+
+ // TODO: Assume its cancelled?
+ if (keepStats) {
+ statTracker.updateAbandonedOperation();
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+ }
+ }
+
+ operationsInProgress.remove(msgID);
+ lastCompletionTime.set(TimeThread.getTime());
+ }
+
+ for (PersistentSearch persistentSearch : getPersistentSearches()) {
+ if (persistentSearch.getMessageID() == messageID) {
+ continue;
+ }
+
+ persistentSearch.cancel();
+ lastCompletionTime.set(TimeThread.getTime());
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+ }
+ }
+ }
+
+ @Override
+ public Selector getWriteSelector() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMaxBlockedWriteTimeLimit() {
+ return connectionHandler.getMaxBlockedWriteTimeLimit();
+ }
+
+ /**
+ * Returns the total number of operations initiated on this connection.
+ *
+ * @return the total number of operations on this connection
+ */
+ @Override
+ public long getNumberOfOperations() {
+ return operationsPerformed.get();
+ }
+
+ /**
+ * Processes the provided LDAP message read from the client and takes whatever action is appropriate. For most
+ * requests, this will include placing the operation in the work queue. Certain requests (in particular, abandons
+ * and unbinds) will be processed directly.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message to process.
+ * @return <CODE>true</CODE> if the appropriate action was taken for the request, or <CODE>false</CODE> if there was
+ * a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
+ */
@Override
public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
@@ -1124,855 +957,693 @@
}, BackpressureMode.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
}
- private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final FlowableEmitter<Response> out)
- {
- if (keepStats)
- {
- statTracker.updateMessageRead(message);
+ private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final FlowableEmitter<Response> out) {
+ if (keepStats) {
+ statTracker.updateMessageRead(message);
+ }
+ operationsPerformed.getAndIncrement();
+
+ List<Control> opControls = message.getControls();
+
+ // FIXME -- See if there is a bind in progress. If so, then deny
+ // most kinds of operations.
+
+ // Figure out what type of operation we're dealing with based on the
+ // LDAP message. Abandon and unbind requests will be processed here.
+ // All other types of requests will be encapsulated into operations
+ // and append into the work queue to be picked up by a worker
+ // thread. Any other kinds of LDAP messages (e.g., response
+ // messages) are illegal and will result in the connection being
+ // terminated.
+ try {
+ if (bindInProgress.get()) {
+ throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_BIND_IN_PROGRESS.get());
+ } else if (startTLSInProgress.get()) {
+ throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_STARTTLS_IN_PROGRESS.get());
+ } else if (saslBindInProgress.get() && message.getProtocolOpType() != OP_TYPE_BIND_REQUEST) {
+ throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_SASLBIND_IN_PROGRESS.get());
+ }
+
+ boolean result;
+ switch (message.getProtocolOpType()) {
+ case OP_TYPE_ABANDON_REQUEST:
+ return processAbandonRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_ADD_REQUEST:
+ return processAddRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_BIND_REQUEST:
+ boolean isSaslBind =
+ message.getBindRequestProtocolOp().getAuthenticationType() == AuthenticationType.SASL;
+ bindInProgress.set(true);
+ if (isSaslBind) {
+ saslBindInProgress.set(true);
+ }
+ result = processBindRequest(queueingStrategy, message, opControls, out);
+ if (!result) {
+ bindInProgress.set(false);
+ if (isSaslBind) {
+ saslBindInProgress.set(false);
+ }
+ }
+ return result;
+ case OP_TYPE_COMPARE_REQUEST:
+ return processCompareRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_DELETE_REQUEST:
+ return processDeleteRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_EXTENDED_REQUEST:
+ boolean isStartTlsRequest = OID_START_TLS_REQUEST.equals(message.getExtendedRequestProtocolOp()
+ .getOID());
+ if (isStartTlsRequest) {
+ startTLSInProgress.set(true);
+ }
+ result = processExtendedRequest(queueingStrategy, message, opControls, out);
+ if (!result && isStartTlsRequest) {
+ startTLSInProgress.set(false);
+ }
+ return result;
+ case OP_TYPE_MODIFY_REQUEST:
+ return processModifyRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_MODIFY_DN_REQUEST:
+ return processModifyDNRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_SEARCH_REQUEST:
+ return processSearchRequest(queueingStrategy, message, opControls, out);
+ case OP_TYPE_UNBIND_REQUEST:
+ return processUnbindRequest(message, opControls);
+ default:
+ LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_INVALID_REQUEST_TYPE.get(
+ message.getProtocolOpName(), message.getMessageID());
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+ return false;
+ }
+ } catch (Exception e) {
+ logger.traceException(e);
+
+ LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_PROCESSING_FAILURE.get(message.getProtocolOpName(),
+ message.getMessageID(), e);
+ disconnect(DisconnectReason.SERVER_ERROR, true, msg);
+ return false;
+ }
}
- operationsPerformed.getAndIncrement();
- List<Control> opControls = message.getControls();
+ /**
+ * Processes the provided LDAP message as an abandon request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the abandon request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processAbandonRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ disconnectControlsNotAllowed();
+ return false;
+ }
- // FIXME -- See if there is a bind in progress. If so, then deny
- // most kinds of operations.
+ // Create the abandon operation and add it into the work queue.
+ AbandonRequestProtocolOp protocolOp = message.getAbandonRequestProtocolOp();
+ AbandonOperationBasis abandonOp = new AbandonOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getIDToAbandon());
+ abandonOp.setAttachment(REACTIVE_OUT, out);
- // Figure out what type of operation we're dealing with based on the
- // LDAP message. Abandon and unbind requests will be processed here.
- // All other types of requests will be encapsulated into operations
- // and append into the work queue to be picked up by a worker
- // thread. Any other kinds of LDAP messages (e.g., response
- // messages) are illegal and will result in the connection being
- // terminated.
- try
- {
- if (bindInProgress.get())
- {
- throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_BIND_IN_PROGRESS.get());
- }
- else if (startTLSInProgress.get())
- {
- throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_STARTTLS_IN_PROGRESS.get());
- }
- else if (saslBindInProgress.get() && message.getProtocolOpType() != OP_TYPE_BIND_REQUEST)
- {
- throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_SASLBIND_IN_PROGRESS.get());
- }
+ try {
+ addOperationInProgress(queueingStrategy, abandonOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
- boolean result;
- switch (message.getProtocolOpType())
- {
- case OP_TYPE_ABANDON_REQUEST:
- return processAbandonRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_ADD_REQUEST:
- return processAddRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_BIND_REQUEST:
- boolean isSaslBind = message.getBindRequestProtocolOp().getAuthenticationType() == AuthenticationType.SASL;
- bindInProgress.set(true);
- if (isSaslBind)
- {
- saslBindInProgress.set(true);
+ // Don't send an error response since abandon operations
+ // don't have a response.
}
- result = processBindRequest(queueingStrategy, message, opControls, out);
- if(!result)
- {
- bindInProgress.set(false);
- if (isSaslBind)
- {
- saslBindInProgress.set(false);
- }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as an add request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the add request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processAddRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
}
- return result;
- case OP_TYPE_COMPARE_REQUEST:
- return processCompareRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_DELETE_REQUEST:
- return processDeleteRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_EXTENDED_REQUEST:
- boolean isStartTlsRequest = OID_START_TLS_REQUEST.equals(message.getExtendedRequestProtocolOp().getOID());
- if (isStartTlsRequest)
- {
- startTLSInProgress.set(true);
+
+ // Create the add operation and add it into the work queue.
+ AddRequestProtocolOp protocolOp = message.getAddRequestProtocolOp();
+ AddOperationBasis addOp = new AddOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getAttributes());
+ addOp.setAttachment(REACTIVE_OUT, out);
+
+ try {
+ addOperationInProgress(queueingStrategy, addOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final Result result = Responses.newResult(de.getResultCode())
+ .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+ for (String referral : de.getReferralURLs()) {
+ result.addReferralURI(referral);
+ }
+
+ out.onNext(result);
+ out.onComplete();
}
- result = processExtendedRequest(queueingStrategy, message, opControls, out);
- if (!result && isStartTlsRequest)
- {
- startTLSInProgress.set(false);
+
+ return connectionValid;
+ }
+
+ private void disconnectControlsNotAllowed() {
+ disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
+ }
+
+ /**
+ * Processes the provided LDAP message as a bind request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the bind request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processBindRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ BindRequestProtocolOp protocolOp = message.getBindRequestProtocolOp();
+
+ // See if this is an LDAPv2 bind request, and if so whether that
+ // should be allowed.
+ String versionString;
+ switch (ldapVersion = protocolOp.getProtocolVersion()) {
+ case 2:
+ versionString = "2";
+
+ if (!connectionHandler.allowLDAPv2()) {
+ out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
+ return false;
+ }
+
+ if (!controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ break;
+ case 3:
+ versionString = "3";
+ break;
+ default:
+ // Unsupported protocol version. RFC4511 states that we MUST send
+ // a protocol error back to the client.
+ out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion).toString()));
+ out.onComplete();
+ disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion));
+ return false;
}
- return result;
- case OP_TYPE_MODIFY_REQUEST:
- return processModifyRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_MODIFY_DN_REQUEST:
- return processModifyDNRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_SEARCH_REQUEST:
- return processSearchRequest(queueingStrategy, message, opControls, out);
- case OP_TYPE_UNBIND_REQUEST:
- return processUnbindRequest(message, opControls);
- default:
- LocalizableMessage msg =
- ERR_LDAP_DISCONNECT_DUE_TO_INVALID_REQUEST_TYPE.get(message
- .getProtocolOpName(), message.getMessageID());
- disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+
+ ByteString bindDN = protocolOp.getDN();
+
+ BindOperationBasis bindOp;
+ switch (protocolOp.getAuthenticationType()) {
+ case SIMPLE:
+ bindOp = new BindOperationBasis(this, nextOperationID.getAndIncrement(), message.getMessageID(), controls,
+ versionString, bindDN, protocolOp.getSimplePassword());
+ break;
+ case SASL:
+ bindOp = new BindOperationBasis(this, nextOperationID.getAndIncrement(), message.getMessageID(), controls,
+ versionString, bindDN, protocolOp.getSASLMechanism(), protocolOp.getSASLCredentials());
+ break;
+ default:
+ // This is an invalid authentication type, and therefore a
+ // protocol error. As per RFC 2251, a protocol error in a bind
+ // request must result in terminating the connection.
+ LocalizableMessage msg = ERR_LDAP_INVALID_BIND_AUTH_TYPE.get(message.getMessageID(),
+ protocolOp.getAuthenticationType());
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+ return false;
+ }
+
+ // Add the operation into the work queue.
+ bindOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, bindOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final Result result = Responses.newBindResult(de.getResultCode())
+ .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+ for (String referral : de.getReferralURLs()) {
+ result.addReferralURI(referral);
+ }
+
+ out.onNext(result);
+ out.onComplete();
+
+ // If it was a protocol error, then terminate the connection.
+ if (de.getResultCode() == ResultCode.PROTOCOL_ERROR) {
+ LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_BIND_PROTOCOL_ERROR.get(message.getMessageID(),
+ de.getMessageObject());
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+ }
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as a compare request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the compare request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processCompareRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ CompareRequestProtocolOp protocolOp = message.getCompareRequestProtocolOp();
+ CompareOperationBasis compareOp = new CompareOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getAttributeType(),
+ protocolOp.getAssertionValue());
+
+ // Add the operation into the work queue.
+ compareOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, compareOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final CompareResult result = Responses.newCompareResult(de.getResultCode())
+ .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as a delete request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the delete request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processDeleteRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ DeleteRequestProtocolOp protocolOp = message.getDeleteRequestProtocolOp();
+ DeleteOperationBasis deleteOp = new DeleteOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getDN());
+
+ // Add the operation into the work queue.
+ deleteOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, deleteOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final Result result = Responses.newResult(de.getResultCode())
+ .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+ result.getReferralURIs().addAll(de.getReferralURLs());
+
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as an extended request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the extended request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processExtendedRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ // See if this is an LDAPv2 client. If it is, then they should not
+ // be issuing extended requests. We can't send a response that we
+ // can be sure they can understand, so we have no choice but to
+ // close the connection.
+ if (ldapVersion == 2) {
+ // LDAPv2 clients aren't allowed to send controls.
+ LocalizableMessage msg = ERR_LDAPV2_EXTENDED_REQUEST_NOT_ALLOWED.get(getConnectionID(),
+ message.getMessageID());
+
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(msg.toString()));
+ out.onComplete();
+
+ logger.error(msg);
+ disconnect(DisconnectReason.PROTOCOL_ERROR, false, msg);
+ return false;
+ }
+
+ // FIXME -- Do we need to handle certain types of request here?
+ // -- StartTLS requests
+ // -- Cancel requests
+
+ ExtendedRequestProtocolOp protocolOp = message.getExtendedRequestProtocolOp();
+ ExtendedOperationBasis extendedOp = new ExtendedOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getOID(), protocolOp.getValue());
+
+ // Add the operation into the work queue.
+ extendedOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, extendedOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+ final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
+ .setMatchedDN(de.getMatchedDN().toString());
+ result.getReferralURIs().addAll(de.getReferralURLs());
+
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as a modify request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the modify request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processModifyRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ ModifyRequestProtocolOp protocolOp = message.getModifyRequestProtocolOp();
+ ModifyOperationBasis modifyOp = new ModifyOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getModifications());
+
+ // Add the operation into the work queue.
+ modifyOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, modifyOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+ final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
+ .setMatchedDN(de.getMatchedDN().toString());
+ result.getReferralURIs().addAll(de.getReferralURLs());
+
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as a modify DN request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the modify DN request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processModifyDNRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ ModifyDNRequestProtocolOp protocolOp = message.getModifyDNRequestProtocolOp();
+ ModifyDNOperationBasis modifyDNOp = new ModifyDNOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getEntryDN(), protocolOp.getNewRDN(),
+ protocolOp.deleteOldRDN(), protocolOp.getNewSuperior());
+
+ // Add the operation into the work queue.
+ modifyDNOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, modifyDNOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
+ .setMatchedDN(de.getMatchedDN().toString());
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ for (Control control : modifyDNOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as a search request.
+ *
+ * @param queueingStrategy
+ * The {@link QueueingStrategy} to use for operation
+ * @param message
+ * The LDAP message containing the search request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processSearchRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+ final List<Control> controls, final FlowableEmitter<Response> out) {
+ if (ldapVersion == 2 && !controls.isEmpty()) {
+ // LDAPv2 clients aren't allowed to send controls.
+ out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+ ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+ out.onComplete();
+ disconnectControlsNotAllowed();
+ return false;
+ }
+
+ SearchRequestProtocolOp protocolOp = message.getSearchRequestProtocolOp();
+ SearchOperationBasis searchOp = new SearchOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls, protocolOp.getBaseDN(), protocolOp.getScope(),
+ protocolOp.getDereferencePolicy(), protocolOp.getSizeLimit(), protocolOp.getTimeLimit(),
+ protocolOp.getTypesOnly(), protocolOp.getFilter(), protocolOp.getAttributes());
+
+ // Add the operation into the work queue.
+ searchOp.setAttachment(REACTIVE_OUT, out);
+ try {
+ addOperationInProgress(queueingStrategy, searchOp);
+ } catch (DirectoryException de) {
+ logger.traceException(de);
+
+ final Result result = Responses.newResult(de.getResultCode());
+ if (de.getMessage() != null) {
+ result.setDiagnosticMessage(de.getMessage());
+ }
+ if (de.getMatchedDN() != null) {
+ result.setMatchedDN(de.getMatchedDN().toString());
+ }
+ if (de.getReferralURLs() != null) {
+ result.getReferralURIs().addAll(de.getReferralURLs());
+ }
+ if (searchOp.getResponseControls() != null) {
+ for (Control control : searchOp.getResponseControls()) {
+ result.addControl(Converters.from(control));
+ }
+ }
+ out.onNext(result);
+ out.onComplete();
+ }
+
+ return connectionValid;
+ }
+
+ /**
+ * Processes the provided LDAP message as an unbind request.
+ *
+ * @param message
+ * The LDAP message containing the unbind request to process.
+ * @param controls
+ * The set of pre-decoded request controls contained in the message.
+ * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+ * connection has been closed as a result (it is the responsibility of this method to close the connection).
+ */
+ private boolean processUnbindRequest(final LDAPMessage message, final List<Control> controls) {
+ UnbindOperationBasis unbindOp = new UnbindOperationBasis(this, nextOperationID.getAndIncrement(),
+ message.getMessageID(), controls);
+
+ unbindOp.run();
+
+ // The client connection will never be valid after an unbind.
return false;
- }
- }
- catch (Exception e)
- {
- logger.traceException(e);
-
- LocalizableMessage msg =
- ERR_LDAP_DISCONNECT_DUE_TO_PROCESSING_FAILURE.get(message
- .getProtocolOpName(), message.getMessageID(), e);
- disconnect(DisconnectReason.SERVER_ERROR, true, msg);
- return false;
- }
- }
-
- /**
- * Processes the provided LDAP message as an abandon request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the abandon request to
- * process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processAbandonRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- disconnectControlsNotAllowed();
- return false;
}
- // Create the abandon operation and add it into the work queue.
- AbandonRequestProtocolOp protocolOp =
- message.getAbandonRequestProtocolOp();
- AbandonOperationBasis abandonOp =
- new AbandonOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getIDToAbandon());
- abandonOp.setAttachment(REACTIVE_OUT, out);
+ @Override
+ public String getMonitorSummary() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("connID=\"");
+ buffer.append(connectionID);
+ buffer.append("\" connectTime=\"");
+ buffer.append(getConnectTimeString());
+ buffer.append("\" source=\"");
+ buffer.append(clientAddress);
+ buffer.append(":");
+ buffer.append(clientPort);
+ buffer.append("\" destination=\"");
+ buffer.append(serverAddress);
+ buffer.append(":");
+ buffer.append(connectionHandler.getListeners().iterator().next().getPort());
+ buffer.append("\" ldapVersion=\"");
+ buffer.append(ldapVersion);
+ buffer.append("\" authDN=\"");
- try
- {
- addOperationInProgress(queueingStrategy, abandonOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- // Don't send an error response since abandon operations
- // don't have a response.
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as an add request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the add request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processAddRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- // Create the add operation and add it into the work queue.
- AddRequestProtocolOp protocolOp = message.getAddRequestProtocolOp();
- AddOperationBasis addOp =
- new AddOperationBasis(this, nextOperationID.getAndIncrement(),
- message.getMessageID(), controls, protocolOp.getDN(),
- protocolOp.getAttributes());
- addOp.setAttachment(REACTIVE_OUT, out);
-
- try
- {
- addOperationInProgress(queueingStrategy, addOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final Result result = Responses.newResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- for(String referral : de.getReferralURLs()) {
- result.addReferralURI(referral);
- }
-
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- private void disconnectControlsNotAllowed()
- {
- disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
- }
-
- /**
- * Processes the provided LDAP message as a bind request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the bind request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processBindRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- BindRequestProtocolOp protocolOp =
- message.getBindRequestProtocolOp();
-
- // See if this is an LDAPv2 bind request, and if so whether that
- // should be allowed.
- String versionString;
- switch (ldapVersion = protocolOp.getProtocolVersion())
- {
- case 2:
- versionString = "2";
-
- if (!connectionHandler.allowLDAPv2())
- {
- out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
- return false;
- }
-
- if (!controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- break;
- case 3:
- versionString = "3";
- break;
- default:
- // Unsupported protocol version. RFC4511 states that we MUST send
- // a protocol error back to the client.
- out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion).toString()));
- out.onComplete();
- disconnect(DisconnectReason.PROTOCOL_ERROR, false,
- ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion));
- return false;
- }
-
- ByteString bindDN = protocolOp.getDN();
-
- BindOperationBasis bindOp;
- switch (protocolOp.getAuthenticationType())
- {
- case SIMPLE:
- bindOp =
- new BindOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- versionString, bindDN, protocolOp.getSimplePassword());
- break;
- case SASL:
- bindOp =
- new BindOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- versionString, bindDN, protocolOp.getSASLMechanism(),
- protocolOp.getSASLCredentials());
- break;
- default:
- // This is an invalid authentication type, and therefore a
- // protocol error. As per RFC 2251, a protocol error in a bind
- // request must result in terminating the connection.
- LocalizableMessage msg =
- ERR_LDAP_INVALID_BIND_AUTH_TYPE.get(message.getMessageID(),
- protocolOp.getAuthenticationType());
- disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
- return false;
- }
-
- // Add the operation into the work queue.
- bindOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, bindOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final Result result = Responses.newBindResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- for(String referral : de.getReferralURLs())
- {
- result.addReferralURI(referral);
- }
-
- out.onNext(result);
- out.onComplete();
-
- // If it was a protocol error, then terminate the connection.
- if (de.getResultCode() == ResultCode.PROTOCOL_ERROR)
- {
- LocalizableMessage msg =
- ERR_LDAP_DISCONNECT_DUE_TO_BIND_PROTOCOL_ERROR.get(message
- .getMessageID(), de.getMessageObject());
- disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
- }
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as a compare request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the compare request to
- * process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processCompareRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- CompareRequestProtocolOp protocolOp =
- message.getCompareRequestProtocolOp();
- CompareOperationBasis compareOp =
- new CompareOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getDN(), protocolOp.getAttributeType(),
- protocolOp.getAssertionValue());
-
- // Add the operation into the work queue.
- compareOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, compareOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final CompareResult result = Responses.newCompareResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as a delete request.
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the delete request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processDeleteRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage( ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- DeleteRequestProtocolOp protocolOp =
- message.getDeleteRequestProtocolOp();
- DeleteOperationBasis deleteOp =
- new DeleteOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getDN());
-
- // Add the operation into the work queue.
- deleteOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, deleteOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final Result result = Responses.newResult(de.getResultCode())
- .setDiagnosticMessage(de.getLocalizedMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
-
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as an extended request.
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the extended request to
- * process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processExtendedRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- // See if this is an LDAPv2 client. If it is, then they should not
- // be issuing extended requests. We can't send a response that we
- // can be sure they can understand, so we have no choice but to
- // close the connection.
- if (ldapVersion == 2)
- {
- // LDAPv2 clients aren't allowed to send controls.
- LocalizableMessage msg = ERR_LDAPV2_EXTENDED_REQUEST_NOT_ALLOWED.get(getConnectionID(), message.getMessageID());
-
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(msg.toString()));
- out.onComplete();
-
- logger.error(msg);
- disconnect(DisconnectReason.PROTOCOL_ERROR, false, msg);
- return false;
- }
-
- // FIXME -- Do we need to handle certain types of request here?
- // -- StartTLS requests
- // -- Cancel requests
-
- ExtendedRequestProtocolOp protocolOp =
- message.getExtendedRequestProtocolOp();
- ExtendedOperationBasis extendedOp =
- new ExtendedOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getOID(), protocolOp.getValue());
-
- // Add the operation into the work queue.
- extendedOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, extendedOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
- final Result result = Responses.newResult(de.getResultCode())
- .setDiagnosticMessage(de.getMessage())
- .setMatchedDN(de.getMatchedDN().toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
-
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as a modify request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the modify request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processModifyRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- ModifyRequestProtocolOp protocolOp =
- message.getModifyRequestProtocolOp();
- ModifyOperationBasis modifyOp =
- new ModifyOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getDN(), protocolOp.getModifications());
-
- // Add the operation into the work queue.
- modifyOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, modifyOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
- final Result result =
- Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()).setMatchedDN(de.getMatchedDN()
- .toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
-
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as a modify DN request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the modify DN request to
- * process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processModifyDNRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- ModifyDNRequestProtocolOp protocolOp =
- message.getModifyDNRequestProtocolOp();
- ModifyDNOperationBasis modifyDNOp =
- new ModifyDNOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getEntryDN(), protocolOp.getNewRDN(), protocolOp
- .deleteOldRDN(), protocolOp.getNewSuperior());
-
- // Add the operation into the work queue.
- modifyDNOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, modifyDNOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final Result result =
- Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()).setMatchedDN(de.getMatchedDN()
- .toString());
- result.getReferralURIs().addAll(de.getReferralURLs());
- for(Control control : modifyDNOp.getResponseControls()) {
- result.addControl(Converters.from(control));
- }
- out.onNext(result);
- out.onComplete();
- }
-
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as a search request.
- *
- * @param queueingStrategy
- * The {@link QueueingStrategy} to use for operation
- * @param message
- * The LDAP message containing the search request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processSearchRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
- final List<Control> controls, final FlowableEmitter<Response> out)
- {
- if (ldapVersion == 2 && !controls.isEmpty())
- {
- // LDAPv2 clients aren't allowed to send controls.
- out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
- .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
- out.onComplete();
- disconnectControlsNotAllowed();
- return false;
- }
-
- SearchRequestProtocolOp protocolOp =
- message.getSearchRequestProtocolOp();
- SearchOperationBasis searchOp =
- new SearchOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls,
- protocolOp.getBaseDN(), protocolOp.getScope(), protocolOp
- .getDereferencePolicy(), protocolOp.getSizeLimit(),
- protocolOp.getTimeLimit(), protocolOp.getTypesOnly(),
- protocolOp.getFilter(), protocolOp.getAttributes());
-
- // Add the operation into the work queue.
- searchOp.setAttachment(REACTIVE_OUT, out);
- try
- {
- addOperationInProgress(queueingStrategy, searchOp);
- }
- catch (DirectoryException de)
- {
- logger.traceException(de);
-
- final Result result = Responses.newResult(de.getResultCode());
- if (de.getMessage() != null)
- {
- result.setDiagnosticMessage(de.getMessage());
- }
- if (de.getMatchedDN() != null)
- {
- result.setMatchedDN(de.getMatchedDN().toString());
- }
- if (de.getReferralURLs() != null)
- {
- result.getReferralURIs().addAll(de.getReferralURLs());
- }
- if (searchOp.getResponseControls() != null)
- {
- for (Control control : searchOp.getResponseControls())
- {
- result.addControl(Converters.from(control));
+ DN authDN = getAuthenticationInfo().getAuthenticationDN();
+ if (authDN != null) {
+ buffer.append(authDN);
}
- }
- out.onNext(result);
- out.onComplete();
+
+ buffer.append("\" security=\"");
+ buffer.append("none");
+
+ buffer.append("\" opsInProgress=\"");
+ buffer.append(operationsInProgress.size());
+ buffer.append("\"");
+
+ int countPSearch = getPersistentSearches().size();
+ if (countPSearch > 0) {
+ buffer.append(" persistentSearches=\"");
+ buffer.append(countPSearch);
+ buffer.append("\"");
+ }
+ return buffer.toString();
}
- return connectionValid;
- }
-
- /**
- * Processes the provided LDAP message as an unbind request.
- *
- * @param message
- * The LDAP message containing the unbind request to process.
- * @param controls
- * The set of pre-decoded request controls contained in the
- * message.
- * @return <CODE>true</CODE> if the request was processed
- * successfully, or <CODE>false</CODE> if not and the
- * connection has been closed as a result (it is the
- * responsibility of this method to close the connection).
- */
- private boolean processUnbindRequest(final LDAPMessage message, final List<Control> controls)
- {
- UnbindOperationBasis unbindOp =
- new UnbindOperationBasis(this, nextOperationID
- .getAndIncrement(), message.getMessageID(), controls);
-
- unbindOp.run();
-
- // The client connection will never be valid after an unbind.
- return false;
- }
-
- @Override
- public String getMonitorSummary()
- {
- StringBuilder buffer = new StringBuilder();
- buffer.append("connID=\"");
- buffer.append(connectionID);
- buffer.append("\" connectTime=\"");
- buffer.append(getConnectTimeString());
- buffer.append("\" source=\"");
- buffer.append(clientAddress);
- buffer.append(":");
- buffer.append(clientPort);
- buffer.append("\" destination=\"");
- buffer.append(serverAddress);
- buffer.append(":");
- buffer.append(connectionHandler.getListeners().iterator().next().getPort());
- buffer.append("\" ldapVersion=\"");
- buffer.append(ldapVersion);
- buffer.append("\" authDN=\"");
-
- DN authDN = getAuthenticationInfo().getAuthenticationDN();
- if (authDN != null)
- {
- buffer.append(authDN);
+ /**
+ * Appends a string representation of this client connection to the provided buffer.
+ *
+ * @param buffer
+ * The buffer to which the information should be appended.
+ */
+ @Override
+ public void toString(StringBuilder buffer) {
+ buffer.append("LDAP client connection from ");
+ buffer.append(clientAddress);
+ buffer.append(":");
+ buffer.append(clientPort);
+ buffer.append(" to ");
+ buffer.append(serverAddress);
+ buffer.append(":");
+ buffer.append(serverPort);
}
- buffer.append("\" security=\"");
- buffer.append("none");
-
- buffer.append("\" opsInProgress=\"");
- buffer.append(operationsInProgress.size());
- buffer.append("\"");
-
- int countPSearch = getPersistentSearches().size();
- if (countPSearch > 0)
- {
- buffer.append(" persistentSearches=\"");
- buffer.append(countPSearch);
- buffer.append("\"");
+ @Override
+ public boolean prepareTLS(LocalizableMessageBuilder unavailableReason) {
+ throw new UnsupportedOperationException();
}
- return buffer.toString();
- }
- /**
- * Appends a string representation of this client connection to the
- * provided buffer.
- *
- * @param buffer
- * The buffer to which the information should be appended.
- */
- @Override
- public void toString(StringBuilder buffer)
- {
- buffer.append("LDAP client connection from ");
- buffer.append(clientAddress);
- buffer.append(":");
- buffer.append(clientPort);
- buffer.append(" to ");
- buffer.append(serverAddress);
- buffer.append(":");
- buffer.append(serverPort);
- }
-
- @Override
- public boolean prepareTLS(LocalizableMessageBuilder unavailableReason)
- {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Retrieves the length of time in milliseconds that this client
- * connection has been idle. <BR>
- * <BR>
- * Note that the default implementation will always return zero.
- * Subclasses associated with connection handlers should override this
- * method if they wish to provided idle time limit functionality.
- *
- * @return The length of time in milliseconds that this client
- * connection has been idle.
- */
- @Override
- public long getIdleTime()
- {
- if (operationsInProgress.isEmpty()
- && getPersistentSearches().isEmpty())
- {
- return TimeThread.getTime() - lastCompletionTime.get();
+ /**
+ * Retrieves the length of time in milliseconds that this client connection has been idle. <BR>
+ * <BR>
+ * Note that the default implementation will always return zero. Subclasses associated with connection handlers
+ * should override this method if they wish to provided idle time limit functionality.
+ *
+ * @return The length of time in milliseconds that this client connection has been idle.
+ */
+ @Override
+ public long getIdleTime() {
+ if (operationsInProgress.isEmpty() && getPersistentSearches().isEmpty()) {
+ return TimeThread.getTime() - lastCompletionTime.get();
+ } else {
+ // There's at least one operation in progress, so it's not idle.
+ return 0L;
+ }
}
- else
- {
- // There's at least one operation in progress, so it's not idle.
- return 0L;
+
+ /**
+ * Return the certificate chain array associated with a connection.
+ *
+ * @return The array of certificates associated with a connection.
+ */
+ public Certificate[] getClientCertificateChain() {
+ return new Certificate[0];
}
- }
- /**
- * Return the certificate chain array associated with a connection.
- *
- * @return The array of certificates associated with a connection.
- */
- public Certificate[] getClientCertificateChain()
- {
- return new Certificate[0];
- }
-
- @Override
- public int getSSF()
- {
- return 0;
- }
+ @Override
+ public int getSSF() {
+ return 0;
+ }
}
--
Gitblit v1.10.0