From 464f6dd52a1eeeef0f7b38d4dc1840501818a36f Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 3273 ++++++++++++++++++++++++++--------------------------------
 1 files changed, 1,472 insertions(+), 1,801 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 0e14bea..96304a7 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -105,1015 +105,848 @@
 import io.reactivex.FlowableOnSubscribe;
 
 /**
- * This class defines an LDAP client connection, which is a type of
- * client connection that will be accepted by an instance of the LDAP
- * connection handler and have its requests decoded by an LDAP request
- * handler.
+ * This class defines an LDAP client connection, which is a type of client connection that will be accepted by an
+ * instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
  */
-public final class LDAPClientConnection2 extends ClientConnection implements
-    TLSCapableConnection, ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>>
-{
-  private static final String REACTIVE_OUT = "reactive.out";
+public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
+        ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
+    private static final String REACTIVE_OUT = "reactive.out";
 
-/** The tracer object for the debug logger. */
-  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+    /** The tracer object for the debug logger. */
+    private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
-  /** The time that the last operation was completed. */
-  private final AtomicLong lastCompletionTime;
-  /** The next operation ID that should be used for this connection. */
-  private final AtomicLong nextOperationID;
+    /** The time that the last operation was completed. */
+    private final AtomicLong lastCompletionTime;
+    /** The next operation ID that should be used for this connection. */
+    private final AtomicLong nextOperationID;
 
-  /**
-   * Indicates whether the Directory Server believes this connection to be valid
-   * and available for communication.
-   */
-  private volatile boolean connectionValid;
+    /**
+     * Indicates whether the Directory Server believes this connection to be valid and available for communication.
+     */
+    private volatile boolean connectionValid;
 
-  /**
-   * Indicates whether this connection is about to be closed. This will be used
-   * to prevent accepting new requests while a disconnect is in progress.
-   */
-  private boolean disconnectRequested;
+    /**
+     * Indicates whether this connection is about to be closed. This will be used to prevent accepting new requests
+     * while a disconnect is in progress.
+     */
+    private boolean disconnectRequested;
 
-  /**
-   * Indicates whether the connection should keep statistics regarding the
-   * operations that it is performing.
-   */
-  private final boolean keepStats;
+    /**
+     * Indicates whether the connection should keep statistics regarding the operations that it is performing.
+     */
+    private final boolean keepStats;
 
-  /** The set of all operations currently in progress on this connection. */
-  private final ConcurrentHashMap<Integer, Operation> operationsInProgress;
+    /** The set of all operations currently in progress on this connection. */
+    private final ConcurrentHashMap<Integer, Operation> operationsInProgress;
 
-  /**
-   * The number of operations performed on this connection. Used to compare with
-   * the resource limits of the network group.
-   */
-  private final AtomicLong operationsPerformed;
+    /**
+     * The number of operations performed on this connection. Used to compare with the resource limits of the network
+     * group.
+     */
+    private final AtomicLong operationsPerformed;
 
-  /** The port on the client from which this connection originated. */
-  private final int clientPort;
-  /** The LDAP version that the client is using to communicate with the server. */
-  private int ldapVersion;
-  /** The port on the server to which this client has connected. */
-  private final int serverPort;
+    /** The port on the client from which this connection originated. */
+    private final int clientPort;
+    /** The LDAP version that the client is using to communicate with the server. */
+    private int ldapVersion;
+    /** The port on the server to which this client has connected. */
+    private final int serverPort;
 
-  /** The reference to the connection handler that accepted this connection. */
-  private final LDAPConnectionHandler2 connectionHandler;
-  /** The statistics tracker associated with this client connection. */
-  private final LDAPStatistics statTracker;
-  private final boolean useNanoTime;
+    /** The reference to the connection handler that accepted this connection. */
+    private final LDAPConnectionHandler2 connectionHandler;
+    /** The statistics tracker associated with this client connection. */
+    private final LDAPStatistics statTracker;
+    private final boolean useNanoTime;
 
-  /** The connection ID assigned to this connection. */
-  private final long connectionID;
+    /** The connection ID assigned to this connection. */
+    private final long connectionID;
 
-  /** The lock used to provide threadsafe access to the set of operations in progress. */
-  private final Object opsInProgressLock;
+    /** The lock used to provide threadsafe access to the set of operations in progress. */
+    private final Object opsInProgressLock;
 
-  /** The socket channel with which this client connection is associated. */
-  private final LDAPClientContext clientContext;
+    /** The socket channel with which this client connection is associated. */
+    private final LDAPClientContext clientContext;
 
-  /** The string representation of the address of the client. */
-  private final String clientAddress;
-  /** The name of the protocol that the client is using to communicate with the server. */
-  private final String protocol;
-  /** The string representation of the address of the server to which the client has connected. */
-  private final String serverAddress;
+    /** The string representation of the address of the client. */
+    private final String clientAddress;
+    /** The name of the protocol that the client is using to communicate with the server. */
+    private final String protocol;
+    /** The string representation of the address of the server to which the client has connected. */
+    private final String serverAddress;
 
-  /**
-   * Creates a new LDAP client connection with the provided information.
-   *
-   * @param connectionHandler
-   *          The connection handler that accepted this connection.
-   * @param clientContext
-   *          The socket channel that may be used to communicate with
-   *          the client.
-   * @param  protocol String representing the protocol (LDAP or LDAP+SSL).
-   * @throws LdapException
-   * @throws DirectoryException If SSL initialisation fails.
-   */
-  LDAPClientConnection2(LDAPConnectionHandler2 connectionHandler, LDAPClientContext clientContext, String protocol,
-          boolean keepStats)
-  {
-    this.connectionHandler = connectionHandler;
-    this.clientContext = clientContext;
-    opsInProgressLock = new Object();
-    ldapVersion = 3;
-    lastCompletionTime = new AtomicLong(TimeThread.getTime());
-    nextOperationID = new AtomicLong(0);
-    connectionValid = true;
-    disconnectRequested = false;
-    operationsInProgress = new ConcurrentHashMap<>();
-    operationsPerformed = new AtomicLong(0);
-    this.keepStats = keepStats;
-    this.protocol = protocol;
+    /**
+     * Creates a new LDAP client connection with the provided information.
+     *
+     * @param connectionHandler
+     *            The connection handler that accepted this connection.
+     * @param clientContext
+     *            The socket channel that may be used to communicate with the client.
+     * @param protocol
+     *            String representing the protocol (LDAP or LDAP+SSL).
+     * @throws LdapException
+     * @throws DirectoryException
+     *             If SSL initialisation fails.
+     */
+    LDAPClientConnection2(LDAPConnectionHandler2 connectionHandler, LDAPClientContext clientContext, String protocol,
+            boolean keepStats) {
+        this.connectionHandler = connectionHandler;
+        this.clientContext = clientContext;
+        opsInProgressLock = new Object();
+        ldapVersion = 3;
+        lastCompletionTime = new AtomicLong(TimeThread.getTime());
+        nextOperationID = new AtomicLong(0);
+        connectionValid = true;
+        disconnectRequested = false;
+        operationsInProgress = new ConcurrentHashMap<>();
+        operationsPerformed = new AtomicLong(0);
+        this.keepStats = keepStats;
+        this.protocol = protocol;
 
-    clientAddress = clientContext.getPeerAddress().getAddress().getHostAddress();
-    clientPort = clientContext.getPeerAddress().getPort();
-    serverAddress = clientContext.getLocalAddress().getAddress().getHostAddress();
-    serverPort = clientContext.getLocalAddress().getPort();
+        clientAddress = clientContext.getPeerAddress().getAddress().getHostAddress();
+        clientPort = clientContext.getPeerAddress().getPort();
+        serverAddress = clientContext.getLocalAddress().getAddress().getHostAddress();
+        serverPort = clientContext.getLocalAddress().getPort();
 
-    statTracker = this.connectionHandler.getStatTracker();
-    if (keepStats)
-    {
-      statTracker.updateConnect();
-      this.useNanoTime = DirectoryServer.getUseNanoTime();
-    }
-    else
-    {
-      this.useNanoTime = false;
-    }
-
-    connectionID = DirectoryServer.newConnectionAccepted(this);
-    clientContext.onDisconnect(new DisconnectListener()
-    {
-      @Override
-      public void exceptionOccurred(LDAPClientContext context, Throwable error)
-      {
-        if (error instanceof LocalizableException)
-        {
-          disconnect(DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
-        }
-        else
-        {
-          disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
-        }
-      }
-
-      @Override
-      public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage)
-      {
-        disconnect(DisconnectReason.SERVER_ERROR, false, null);
-      }
-
-      @Override
-      public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest)
-      {
-        disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
-      }
-    });
-  }
-
-  /**
-   * Retrieves the connection ID assigned to this connection.
-   *
-   * @return The connection ID assigned to this connection.
-   */
-  @Override
-  public long getConnectionID()
-  {
-    return connectionID;
-  }
-
-  /**
-   * Retrieves the connection handler that accepted this client
-   * connection.
-   *
-   * @return The connection handler that accepted this client
-   *         connection.
-   */
-  @Override
-  public ConnectionHandler<?> getConnectionHandler()
-  {
-    return connectionHandler;
-  }
-
-  /**
-   * Retrieves the socket channel that can be used to communicate with
-   * the client.
-   *
-   * @return The socket channel that can be used to communicate with the
-   *         client.
-   */
-  @Override
-  public SocketChannel getSocketChannel()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Retrieves the protocol that the client is using to communicate with
-   * the Directory Server.
-   *
-   * @return The protocol that the client is using to communicate with
-   *         the Directory Server.
-   */
-  @Override
-  public String getProtocol()
-  {
-    return protocol;
-  }
-
-  /**
-   * Retrieves a string representation of the address of the client.
-   *
-   * @return A string representation of the address of the client.
-   */
-  @Override
-  public String getClientAddress()
-  {
-    return clientAddress;
-  }
-
-  /**
-   * Retrieves the port number for this connection on the client system.
-   *
-   * @return The port number for this connection on the client system.
-   */
-  @Override
-  public int getClientPort()
-  {
-    return clientPort;
-  }
-
-  /**
-   * Retrieves a string representation of the address on the server to
-   * which the client connected.
-   *
-   * @return A string representation of the address on the server to
-   *         which the client connected.
-   */
-  @Override
-  public String getServerAddress()
-  {
-    return serverAddress;
-  }
-
-  /**
-   * Retrieves the port number for this connection on the server system.
-   *
-   * @return The port number for this connection on the server system.
-   */
-  @Override
-  public int getServerPort()
-  {
-    return serverPort;
-  }
-
-  /**
-   * Retrieves the <CODE>java.net.InetAddress</CODE> associated with the
-   * remote client system.
-   *
-   * @return The <CODE>java.net.InetAddress</CODE> associated with the
-   *         remote client system. It may be <CODE>null</CODE> if the
-   *         client is not connected over an IP-based connection.
-   */
-  @Override
-  public InetAddress getRemoteAddress()
-  {
-    return clientContext.getPeerAddress().getAddress();
-  }
-
-  /**
-   * Retrieves the <CODE>java.net.InetAddress</CODE> for the Directory
-   * Server system to which the client has established the connection.
-   *
-   * @return The <CODE>java.net.InetAddress</CODE> for the Directory
-   *         Server system to which the client has established the
-   *         connection. It may be <CODE>null</CODE> if the client is
-   *         not connected over an IP-based connection.
-   */
-  @Override
-  public InetAddress getLocalAddress()
-  {
-    return clientContext.getLocalAddress().getAddress();
-  }
-
-  @Override
-  public boolean isConnectionValid()
-  {
-    return this.connectionValid;
-  }
-
-  /**
-   * Indicates whether this client connection is currently using a
-   * secure mechanism to communicate with the server. Note that this may
-   * change over time based on operations performed by the client or
-   * server (e.g., it may go from <CODE>false</CODE> to
-   * <CODE>true</CODE> if the client uses the StartTLS extended
-   * operation).
-   *
-   * @return <CODE>true</CODE> if the client connection is currently
-   *         using a secure mechanism to communicate with the server, or
-   *         <CODE>false</CODE> if not.
-   */
-  @Override
-  public boolean isSecure()
-  {
-    return false;
-  }
-
-  /**
-   * Sends a response to the client based on the information in the
-   * provided operation.
-   *
-   * @param operation
-   *          The operation for which to send the response.
-   */
-  @Override
-  public void sendResponse(Operation operation)
-  {
-    // Since this is the final response for this operation, we can go
-    // ahead and remove it from the "operations in progress" list. It
-    // can't be canceled after this point, and this will avoid potential
-    // race conditions in which the client immediately sends another
-    // request with the same message ID as was used for this operation.
-
-    if (keepStats) {
-        long time;
-        if (useNanoTime) {
-            time = operation.getProcessingNanoTime();
+        statTracker = this.connectionHandler.getStatTracker();
+        if (keepStats) {
+            statTracker.updateConnect();
+            this.useNanoTime = DirectoryServer.getUseNanoTime();
         } else {
-            time = operation.getProcessingTime();
+            this.useNanoTime = false;
         }
-        this.statTracker.updateOperationMonitoringData(
-                operation.getOperationType(),
-                time);
+
+        connectionID = DirectoryServer.newConnectionAccepted(this);
+        clientContext.onDisconnect(new DisconnectListener() {
+            @Override
+            public void exceptionOccurred(LDAPClientContext context, Throwable error) {
+                if (error instanceof LocalizableException) {
+                    disconnect(
+                            DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
+                } else {
+                    disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
+                }
+            }
+
+            @Override
+            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
+                    String diagnosticMessage) {
+                disconnect(DisconnectReason.SERVER_ERROR, false, null);
+            }
+
+            @Override
+            public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
+                disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
+            }
+        });
     }
 
-    // Avoid sending the response if one has already been sent. This may happen
-    // if operation processing encounters a run-time exception after sending the
-    // response: the worker thread exception handling code will attempt to send
-    // an error result to the client indicating that a problem occurred.
-    if (removeOperationInProgress(operation.getMessageID()))
-    {
-      final Response response = operationToResponse(operation);
-      final FlowableEmitter<Response> out = getOut(operation);
-      if (response != null)
-      {
+    /**
+     * Retrieves the connection ID assigned to this connection.
+     *
+     * @return The connection ID assigned to this connection.
+     */
+    @Override
+    public long getConnectionID() {
+        return connectionID;
+    }
+
+    /**
+     * Retrieves the connection handler that accepted this client connection.
+     *
+     * @return The connection handler that accepted this client connection.
+     */
+    @Override
+    public ConnectionHandler<?> getConnectionHandler() {
+        return connectionHandler;
+    }
+
+    /**
+     * Retrieves the socket channel that can be used to communicate with the client.
+     *
+     * @return The socket channel that can be used to communicate with the client.
+     */
+    @Override
+    public SocketChannel getSocketChannel() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Retrieves the protocol that the client is using to communicate with the Directory Server.
+     *
+     * @return The protocol that the client is using to communicate with the Directory Server.
+     */
+    @Override
+    public String getProtocol() {
+        return protocol;
+    }
+
+    /**
+     * Retrieves a string representation of the address of the client.
+     *
+     * @return A string representation of the address of the client.
+     */
+    @Override
+    public String getClientAddress() {
+        return clientAddress;
+    }
+
+    /**
+     * Retrieves the port number for this connection on the client system.
+     *
+     * @return The port number for this connection on the client system.
+     */
+    @Override
+    public int getClientPort() {
+        return clientPort;
+    }
+
+    /**
+     * Retrieves a string representation of the address on the server to which the client connected.
+     *
+     * @return A string representation of the address on the server to which the client connected.
+     */
+    @Override
+    public String getServerAddress() {
+        return serverAddress;
+    }
+
+    /**
+     * Retrieves the port number for this connection on the server system.
+     *
+     * @return The port number for this connection on the server system.
+     */
+    @Override
+    public int getServerPort() {
+        return serverPort;
+    }
+
+    /**
+     * Retrieves the <CODE>java.net.InetAddress</CODE> associated with the remote client system.
+     *
+     * @return The <CODE>java.net.InetAddress</CODE> associated with the remote client system. It may be
+     *         <CODE>null</CODE> if the client is not connected over an IP-based connection.
+     */
+    @Override
+    public InetAddress getRemoteAddress() {
+        return clientContext.getPeerAddress().getAddress();
+    }
+
+    /**
+     * Retrieves the <CODE>java.net.InetAddress</CODE> for the Directory Server system to which the client has
+     * established the connection.
+     *
+     * @return The <CODE>java.net.InetAddress</CODE> for the Directory Server system to which the client has established
+     *         the connection. It may be <CODE>null</CODE> if the client is not connected over an IP-based connection.
+     */
+    @Override
+    public InetAddress getLocalAddress() {
+        return clientContext.getLocalAddress().getAddress();
+    }
+
+    @Override
+    public boolean isConnectionValid() {
+        return this.connectionValid;
+    }
+
+    /**
+     * Indicates whether this client connection is currently using a secure mechanism to communicate with the server.
+     * Note that this may change over time based on operations performed by the client or server (e.g., it may go from
+     * <CODE>false</CODE> to <CODE>true</CODE> if the client uses the StartTLS extended operation).
+     *
+     * @return <CODE>true</CODE> if the client connection is currently using a secure mechanism to communicate with the
+     *         server, or <CODE>false</CODE> if not.
+     */
+    @Override
+    public boolean isSecure() {
+        return false;
+    }
+
+    /**
+     * Sends a response to the client based on the information in the provided operation.
+     *
+     * @param operation
+     *            The operation for which to send the response.
+     */
+    @Override
+    public void sendResponse(Operation operation) {
+        // Since this is the final response for this operation, we can go
+        // ahead and remove it from the "operations in progress" list. It
+        // can't be canceled after this point, and this will avoid potential
+        // race conditions in which the client immediately sends another
+        // request with the same message ID as was used for this operation.
+
+        if (keepStats) {
+            long time;
+            if (useNanoTime) {
+                time = operation.getProcessingNanoTime();
+            } else {
+                time = operation.getProcessingTime();
+            }
+            this.statTracker.updateOperationMonitoringData(operation.getOperationType(), time);
+        }
+
+        // Avoid sending the response if one has already been sent. This may happen
+        // if operation processing encounters a run-time exception after sending the
+        // response: the worker thread exception handling code will attempt to send
+        // an error result to the client indicating that a problem occurred.
+        if (removeOperationInProgress(operation.getMessageID())) {
+            final Response response = operationToResponse(operation);
+            final FlowableEmitter<Response> out = getOut(operation);
+            if (response != null) {
+                out.onNext(response);
+            }
+            out.onComplete();
+        }
+    }
+
+    /**
+     * Retrieves an LDAPMessage containing a response generated from the provided operation.
+     *
+     * @param operation
+     *            The operation to use to generate the response LDAPMessage.
+     * @return An LDAPMessage containing a response generated from the provided operation.
+     */
+    private Response operationToResponse(Operation operation) {
+        ResultCode resultCode = operation.getResultCode();
+        if (resultCode == null) {
+            // This must mean that the operation has either not yet completed
+            // or that it completed without a result for some reason. In any
+            // case, log a message and set the response to "operations error".
+            logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_NO_RESULT_CODE, operation.getOperationType(),
+                    operation.getConnectionID(), operation.getOperationID());
+            resultCode = DirectoryServer.getServerErrorResultCode();
+        }
+
+        LocalizableMessageBuilder errorMessage = operation.getErrorMessage();
+        String matchedDN = operation.getMatchedDN() != null ? operation.getMatchedDN().toString() : null;
+
+        // Referrals are not allowed for LDAPv2 clients.
+        List<String> referralURLs;
+        if (ldapVersion == 2) {
+            referralURLs = null;
+
+            if (resultCode == ResultCode.REFERRAL) {
+                resultCode = ResultCode.CONSTRAINT_VIOLATION;
+                errorMessage.append(ERR_LDAPV2_REFERRAL_RESULT_CHANGED.get());
+            }
+
+            List<String> opReferrals = operation.getReferralURLs();
+            if (opReferrals != null && !opReferrals.isEmpty()) {
+                StringBuilder referralsStr = new StringBuilder();
+                Iterator<String> iterator = opReferrals.iterator();
+                referralsStr.append(iterator.next());
+
+                while (iterator.hasNext()) {
+                    referralsStr.append(", ");
+                    referralsStr.append(iterator.next());
+                }
+
+                errorMessage.append(ERR_LDAPV2_REFERRALS_OMITTED.get(referralsStr));
+            }
+        } else {
+            referralURLs = operation.getReferralURLs();
+        }
+
+        final Result result;
+        switch (operation.getOperationType()) {
+        case ADD:
+            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+                    .setMatchedDN(matchedDN);
+            break;
+        case BIND:
+            result = Responses.newBindResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+                    .setMatchedDN(matchedDN)
+                    .setServerSASLCredentials(((BindOperationBasis) operation).getServerSASLCredentials());
+            break;
+        case COMPARE:
+            result = Responses.newCompareResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+                    .setMatchedDN(matchedDN);
+            break;
+        case DELETE:
+            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+                    .setMatchedDN(matchedDN);
+            break;
+        case EXTENDED:
+            // If this an LDAPv2 client, then we can't send this.
+            if (ldapVersion == 2) {
+                logger.error(ERR_LDAPV2_SKIPPING_EXTENDED_RESPONSE, getConnectionID(), operation.getOperationID(),
+                        operation);
+                return null;
+            }
+
+            ExtendedOperationBasis extOp = (ExtendedOperationBasis) operation;
+            result = Responses.newGenericExtendedResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+                    .setMatchedDN(matchedDN).setOID(extOp.getResponseOID()).setValue(extOp.getResponseValue());
+            break;
+        case MODIFY:
+            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+                    .setMatchedDN(matchedDN);
+            break;
+        case MODIFY_DN:
+            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+                    .setMatchedDN(matchedDN);
+            break;
+        case SEARCH:
+            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
+                    .setMatchedDN(matchedDN);
+            break;
+        default:
+            // This must be a type of operation that doesn't have a response.
+            // This shouldn't happen, so log a message and return.
+            logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_INVALID_OP, operation.getOperationType(), getConnectionID(),
+                    operation.getOperationID(), operation);
+            return null;
+        }
+        if (referralURLs != null) {
+            result.getReferralURIs().addAll(referralURLs);
+        }
+
+        // Controls are not allowed for LDAPv2 clients.
+        if (ldapVersion != 2) {
+            for (Control control : operation.getResponseControls()) {
+                result.addControl(Converters.from(control));
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Sends the provided search result entry to the client.
+     *
+     * @param searchOperation
+     *            The search operation with which the entry is associated
+     * @param searchEntry
+     *            The search result entry to be sent to the client
+     */
+    @Override
+    public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
+        getEmitter(searchOperation).onNext(toResponse(searchEntry));
+    }
+
+    private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
+        return getOut(searchOperation);
+    }
+
+    private Response toResponse(SearchResultEntry searchEntry) {
+        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
+    }
+
+    private FlowableEmitter<Response> getOut(Operation operation) {
+        return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
+    }
+
+    /**
+     * Sends the provided search result reference to the client.
+     *
+     * @param searchOperation
+     *            The search operation with which the reference is associated.
+     * @param searchReference
+     *            The search result reference to be sent to the client.
+     * @return <CODE>true</CODE> if the client is able to accept referrals, or <CODE>false</CODE> if the client cannot
+     *         handle referrals and no more attempts should be made to send them for the associated search operation.
+     */
+    @Override
+    public boolean sendSearchReference(SearchOperation searchOperation, SearchResultReference searchReference) {
+        // Make sure this is not an LDAPv2 client. If it is, then they can't
+        // see referrals so we'll not send anything. Also, throw an
+        // exception so that the core server will know not to try sending
+        // any more referrals to this client for the rest of the operation.
+        if (ldapVersion == 2) {
+            logger.error(ERR_LDAPV2_SKIPPING_SEARCH_REFERENCE, getConnectionID(), searchOperation.getOperationID(),
+                    searchReference);
+            return false;
+        }
+
+        final FlowableEmitter<Response> out = getOut(searchOperation);
+        out.onNext(Converters.from(searchReference));
+
+        return true;
+    }
+
+    /**
+     * Sends the provided intermediate response message to the client.
+     *
+     * @param intermediateResponse
+     *            The intermediate response message to be sent.
+     * @return <CODE>true</CODE> if processing on the associated operation should continue, or <CODE>false</CODE> if
+     *         not.
+     */
+    @Override
+    protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
+        final Operation operation = intermediateResponse.getOperation();
+        final FlowableEmitter<Response> out = getOut(operation);
+
+        final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
+                intermediateResponse.getValue());
+        for (Control control : intermediateResponse.getControls()) {
+            response.addControl(Converters.from(control));
+        }
+
         out.onNext(response);
-      }
-      out.onComplete();
-    }
-  }
 
-  /**
-   * Retrieves an LDAPMessage containing a response generated from the
-   * provided operation.
-   *
-   * @param operation
-   *          The operation to use to generate the response LDAPMessage.
-   * @return An LDAPMessage containing a response generated from the
-   *         provided operation.
-   */
-  private Response operationToResponse(Operation operation)
-  {
-    ResultCode resultCode = operation.getResultCode();
-    if (resultCode == null)
-    {
-      // This must mean that the operation has either not yet completed
-      // or that it completed without a result for some reason. In any
-      // case, log a message and set the response to "operations error".
-      logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_NO_RESULT_CODE, operation.getOperationType(),
-          operation.getConnectionID(), operation.getOperationID());
-      resultCode = DirectoryServer.getServerErrorResultCode();
+        // The only reason we shouldn't continue processing is if the
+        // connection is closed.
+        return connectionValid;
     }
 
-    LocalizableMessageBuilder errorMessage = operation.getErrorMessage();
-    String matchedDN = operation.getMatchedDN() != null ? operation.getMatchedDN().toString() : null;
-
-    // Referrals are not allowed for LDAPv2 clients.
-    List<String> referralURLs;
-    if (ldapVersion == 2)
-    {
-      referralURLs = null;
-
-      if (resultCode == ResultCode.REFERRAL)
-      {
-        resultCode = ResultCode.CONSTRAINT_VIOLATION;
-        errorMessage.append(ERR_LDAPV2_REFERRAL_RESULT_CHANGED.get());
-      }
-
-      List<String> opReferrals = operation.getReferralURLs();
-      if (opReferrals != null && !opReferrals.isEmpty())
-      {
-        StringBuilder referralsStr = new StringBuilder();
-        Iterator<String> iterator = opReferrals.iterator();
-        referralsStr.append(iterator.next());
-
-        while (iterator.hasNext())
-        {
-          referralsStr.append(", ");
-          referralsStr.append(iterator.next());
-        }
-
-        errorMessage.append(ERR_LDAPV2_REFERRALS_OMITTED.get(referralsStr));
-      }
-    }
-    else
-    {
-      referralURLs = operation.getReferralURLs();
-    }
-
-    final Result result;
-    switch (operation.getOperationType())
-    {
-    case ADD:
-      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
-      break;
-    case BIND:
-      result = Responses.newBindResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN)
-                        .setServerSASLCredentials(((BindOperationBasis) operation).getServerSASLCredentials());
-      break;
-    case COMPARE:
-      result = Responses.newCompareResult(resultCode)
-                        .setDiagnosticMessage(errorMessage.toString())
-                        .setMatchedDN(matchedDN);
-      break;
-    case DELETE:
-      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
-      break;
-    case EXTENDED:
-      // If this an LDAPv2 client, then we can't send this.
-      if (ldapVersion == 2)
-      {
-        logger.error(ERR_LDAPV2_SKIPPING_EXTENDED_RESPONSE,
-            getConnectionID(), operation.getOperationID(), operation);
-        return null;
-      }
-
-      ExtendedOperationBasis extOp = (ExtendedOperationBasis) operation;
-      result = Responses.newGenericExtendedResult(resultCode)
-                        .setDiagnosticMessage(errorMessage.toString())
-                        .setMatchedDN(matchedDN)
-                        .setOID(extOp.getResponseOID()).setValue(extOp.getResponseValue());
-      break;
-    case MODIFY:
-      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
-      break;
-    case MODIFY_DN:
-      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
-      break;
-    case SEARCH:
-      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
-      break;
-    default:
-      // This must be a type of operation that doesn't have a response.
-      // This shouldn't happen, so log a message and return.
-      logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_INVALID_OP, operation.getOperationType(), getConnectionID(),
-          operation.getOperationID(), operation);
-      return null;
-    }
-    if (referralURLs != null)
-    {
-      result.getReferralURIs().addAll(referralURLs);
-    }
-
-    // Controls are not allowed for LDAPv2 clients.
-    if (ldapVersion != 2)
-    {
-      for(Control control : operation.getResponseControls()) {
-        result.addControl(Converters.from(control));
-      }
-    }
-
-    return result;
-  }
-
-  /**
-   * Sends the provided search result entry to the client.
-   *
-   * @param searchOperation
-   *          The search operation with which the entry is associated
-   * @param searchEntry
-   *          The search result entry to be sent to the client
-   */
-  @Override
-  public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry)
-  {
-    getEmitter(searchOperation).onNext(toResponse(searchEntry));
-  }
-
-  private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation)
-  {
-    return getOut(searchOperation);
-  }
-
-  private Response toResponse(SearchResultEntry searchEntry)
-  {
-    return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
-  }
-
-  private FlowableEmitter<Response> getOut(Operation operation)
-  {
-    return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
-  }
-
-  /**
-   * Sends the provided search result reference to the client.
-   *
-   * @param searchOperation
-   *          The search operation with which the reference is
-   *          associated.
-   * @param searchReference
-   *          The search result reference to be sent to the client.
-   * @return <CODE>true</CODE> if the client is able to accept
-   *         referrals, or <CODE>false</CODE> if the client cannot
-   *         handle referrals and no more attempts should be made to
-   *         send them for the associated search operation.
-   */
-  @Override
-  public boolean sendSearchReference(SearchOperation searchOperation, SearchResultReference searchReference)
-  {
-    // Make sure this is not an LDAPv2 client. If it is, then they can't
-    // see referrals so we'll not send anything. Also, throw an
-    // exception so that the core server will know not to try sending
-    // any more referrals to this client for the rest of the operation.
-    if (ldapVersion == 2)
-    {
-      logger.error(ERR_LDAPV2_SKIPPING_SEARCH_REFERENCE, getConnectionID(),
-              searchOperation.getOperationID(), searchReference);
-      return false;
-    }
-
-    final FlowableEmitter<Response> out = getOut(searchOperation);
-    out.onNext(Converters.from(searchReference));
-
-    return true;
-  }
-
-  /**
-   * Sends the provided intermediate response message to the client.
-   *
-   * @param intermediateResponse
-   *          The intermediate response message to be sent.
-   * @return <CODE>true</CODE> if processing on the associated operation
-   *         should continue, or <CODE>false</CODE> if not.
-   */
-  @Override
-  protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse)
-  {
-    final Operation operation = intermediateResponse.getOperation();
-    final FlowableEmitter<Response> out = getOut(operation);
-
-    final Response response =
-        Responses.newGenericIntermediateResponse(intermediateResponse.getOID(), intermediateResponse.getValue());
-    for (Control control : intermediateResponse.getControls())
-    {
-      response.addControl(Converters.from(control));
-    }
-
-    out.onNext(response);
-
-    // The only reason we shouldn't continue processing is if the
-    // connection is closed.
-    return connectionValid;
-  }
-
-  /**
-   * Closes the connection to the client, optionally sending it a
-   * message indicating the reason for the closure. Note that the
-   * ability to send a notice of disconnection may not be available for
-   * all protocols or under all circumstances.
-   *
-   * @param disconnectReason
-   *          The disconnect reason that provides the generic cause for
-   *          the disconnect.
-   * @param sendNotification
-   *          Indicates whether to try to provide notification to the
-   *          client that the connection will be closed.
-   * @param message
-   *          The message to include in the disconnect notification
-   *          response. It may be <CODE>null</CODE> if no message is to
-   *          be sent.
-   */
-  @Override
-  public void disconnect(DisconnectReason disconnectReason,
-      boolean sendNotification, LocalizableMessage message)
-  {
-    // Set a flag indicating that the connection is being terminated so
-    // that no new requests will be accepted. Also cancel all operations
-    // in progress.
-    synchronized (opsInProgressLock)
-    {
-      // If we are already in the middle of a disconnect, then don't
-      // do anything.
-      if (disconnectRequested)
-      {
-        return;
-      }
-
-      disconnectRequested = true;
-    }
-
-    if (keepStats)
-    {
-      statTracker.updateDisconnect();
-    }
-
-    if (connectionID >= 0)
-    {
-      DirectoryServer.connectionClosed(this);
-    }
-
-    // Indicate that this connection is no longer valid.
-    connectionValid = false;
-
-    final LocalizableMessage cancelMessage;
-    if (message != null)
-    {
-      cancelMessage = new LocalizableMessageBuilder()
-          .append(disconnectReason.getClosureMessage())
-          .append(": ")
-          .append(message)
-          .toMessage();
-    }
-    else
-    {
-      cancelMessage = disconnectReason.getClosureMessage();
-    }
-    cancelAllOperations(new CancelRequest(true, cancelMessage));
-    finalizeConnectionInternal();
-
-    // See if we should send a notification to the client. If so, then
-    // construct and send a notice of disconnection unsolicited
-    // response. Note that we cannot send this notification to an LDAPv2 client.
-    if (sendNotification && ldapVersion != 2)
-    {
-      try
-      {
-        LocalizableMessage errMsg = message != null ? message : INFO_LDAP_CLIENT_GENERIC_NOTICE_OF_DISCONNECTION.get();
-        clientContext.disconnect(ResultCode.valueOf(toResultCode(disconnectReason)), errMsg.toString());
-      }
-      catch (Exception e)
-      {
-        // NYI -- Log a message indicating that we couldn't send the
-        // notice of disconnection.
-        logger.traceException(e);
-      }
-    }
-    else
-    {
-        clientContext.disconnect();
-    }
-
-    // NYI -- Deregister the client connection from any server components that
-    // might know about it.
-
-    logDisconnect(this, disconnectReason, message);
-
-    try
-    {
-      PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
-      pluginManager.invokePostDisconnectPlugins(this, disconnectReason, message);
-    }
-    catch (Exception e)
-    {
-      logger.traceException(e);
-    }
-  }
-
-  private int toResultCode(DisconnectReason disconnectReason)
-  {
-    switch (disconnectReason)
-    {
-    case PROTOCOL_ERROR:
-      return LDAPResultCode.PROTOCOL_ERROR;
-    case SERVER_SHUTDOWN:
-      return LDAPResultCode.UNAVAILABLE;
-    case SERVER_ERROR:
-      return DirectoryServer.getServerErrorResultCode().intValue();
-    case ADMIN_LIMIT_EXCEEDED:
-    case IDLE_TIME_LIMIT_EXCEEDED:
-    case MAX_REQUEST_SIZE_EXCEEDED:
-    case IO_TIMEOUT:
-      return LDAPResultCode.ADMIN_LIMIT_EXCEEDED;
-    case CONNECTION_REJECTED:
-      return LDAPResultCode.CONSTRAINT_VIOLATION;
-    case INVALID_CREDENTIALS:
-      return LDAPResultCode.INVALID_CREDENTIALS;
-    default:
-      return LDAPResultCode.OTHER;
-    }
-  }
-
-  /**
-   * Retrieves the set of operations in progress for this client
-   * connection. This list must not be altered by any caller.
-   *
-   * @return The set of operations in progress for this client
-   *         connection.
-   */
-  @Override
-  public Collection<Operation> getOperationsInProgress()
-  {
-    return operationsInProgress.values();
-  }
-
-  /**
-   * Retrieves the operation in progress with the specified message ID.
-   *
-   * @param messageID
-   *          The message ID for the operation to retrieve.
-   * @return The operation in progress with the specified message ID, or
-   *         <CODE>null</CODE> if no such operation could be found.
-   */
-  @Override
-  public Operation getOperationInProgress(int messageID)
-  {
-    return operationsInProgress.get(messageID);
-  }
-
-  /**
-   * Adds the provided operation to the set of operations in progress
-   * for this client connection.
-   *
-   * @param operation
-   *          The operation to add to the set of operations in progress
-   *          for this client connection.
-   * @throws DirectoryException
-   *           If the operation is not added for some reason (e.g., the
-   *           client already has reached the maximum allowed concurrent
-   *           requests).
-   */
-  private void addOperationInProgress(final QueueingStrategy queueingStrategy, Operation operation)
-      throws DirectoryException
-  {
-    int messageID = operation.getMessageID();
-
-    // We need to grab a lock to ensure that no one else can add
-    // operations to the queue while we are performing some preliminary
-    // checks.
-    try
-    {
-      synchronized (opsInProgressLock)
-      {
-        // If we're already in the process of disconnecting the client,
-        // then reject the operation.
-        if (disconnectRequested)
-        {
-          LocalizableMessage message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
-          throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
-              message);
-        }
-
-        // Add the operation to the list of operations in progress for
-        // this connection.
-        Operation op = operationsInProgress.putIfAbsent(messageID, operation);
-
-        // See if there is already an operation in progress with the
-        // same message ID. If so, then we can't allow it.
-        if (op != null)
-        {
-          LocalizableMessage message =
-            WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
-          throw new DirectoryException(ResultCode.PROTOCOL_ERROR,
-              message);
-        }
-      }
-
-      // Try to add the operation to the work queue,
-      // or run it synchronously (typically for the administration
-      // connector)
-      queueingStrategy.enqueueRequest(operation);
-    }
-    catch (DirectoryException de)
-    {
-      logger.traceException(de);
-
-      operationsInProgress.remove(messageID);
-      lastCompletionTime.set(TimeThread.getTime());
-
-      throw de;
-    }
-    catch (Exception e)
-    {
-      logger.traceException(e);
-
-      LocalizableMessage message =
-        WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
-      throw new DirectoryException(DirectoryServer
-          .getServerErrorResultCode(), message, e);
-    }
-  }
-
-  /**
-   * Removes the provided operation from the set of operations in
-   * progress for this client connection. Note that this does not make
-   * any attempt to cancel any processing that may already be in
-   * progress for the operation.
-   *
-   * @param messageID
-   *          The message ID of the operation to remove from the set of
-   *          operations in progress.
-   * @return <CODE>true</CODE> if the operation was found and removed
-   *         from the set of operations in progress, or
-   *         <CODE>false</CODE> if not.
-   */
-  @Override
-  public boolean removeOperationInProgress(int messageID)
-  {
-    Operation operation = operationsInProgress.remove(messageID);
-    if (operation == null)
-    {
-      return false;
-    }
-
-    if (operation.getOperationType() == OperationType.ABANDON
-        && keepStats
-        && operation.getResultCode() == ResultCode.CANCELLED)
-    {
-      statTracker.updateAbandonedOperation();
-    }
-
-    lastCompletionTime.set(TimeThread.getTime());
-    return true;
-  }
-
-  /**
-   * Attempts to cancel the specified operation.
-   *
-   * @param messageID
-   *          The message ID of the operation to cancel.
-   * @param cancelRequest
-   *          An object providing additional information about how the
-   *          cancel should be processed.
-   * @return A cancel result that either indicates that the cancel was
-   *         successful or provides a reason that it was not.
-   */
-  @Override
-  public CancelResult cancelOperation(int messageID,
-      CancelRequest cancelRequest)
-  {
-    Operation op = operationsInProgress.get(messageID);
-    if (op != null)
-    {
-      return op.cancel(cancelRequest);
-    }
-
-    // See if the operation is in the list of persistent searches.
-    for (PersistentSearch ps : getPersistentSearches())
-    {
-      if (ps.getMessageID() == messageID)
-      {
-        // We only need to find the first persistent search
-        // associated with the provided message ID. The persistent search
-        // will ensure that all other related persistent searches are cancelled.
-        return ps.cancel();
-      }
-    }
-    return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
-  }
-
-  /**
-   * Attempts to cancel all operations in progress on this connection.
-   *
-   * @param cancelRequest
-   *          An object providing additional information about how the
-   *          cancel should be processed.
-   */
-  @Override
-  public void cancelAllOperations(CancelRequest cancelRequest)
-  {
-    // Make sure that no one can add any new operations.
-    synchronized (opsInProgressLock)
-    {
-      try
-      {
-        for (Operation o : operationsInProgress.values())
-        {
-          try
-          {
-            o.abort(cancelRequest);
-
-            // TODO: Assume its cancelled?
-            if (keepStats)
-            {
-              statTracker.updateAbandonedOperation();
+    /**
+     * Closes the connection to the client, optionally sending it a message indicating the reason for the closure. Note
+     * that the ability to send a notice of disconnection may not be available for all protocols or under all
+     * circumstances.
+     *
+     * @param disconnectReason
+     *            The disconnect reason that provides the generic cause for the disconnect.
+     * @param sendNotification
+     *            Indicates whether to try to provide notification to the client that the connection will be closed.
+     * @param message
+     *            The message to include in the disconnect notification response. It may be <CODE>null</CODE> if no
+     *            message is to be sent.
+     */
+    @Override
+    public void disconnect(DisconnectReason disconnectReason, boolean sendNotification, LocalizableMessage message) {
+        // Set a flag indicating that the connection is being terminated so
+        // that no new requests will be accepted. Also cancel all operations
+        // in progress.
+        synchronized (opsInProgressLock) {
+            // If we are already in the middle of a disconnect, then don't
+            // do anything.
+            if (disconnectRequested) {
+                return;
             }
-          }
-          catch (Exception e)
-          {
+
+            disconnectRequested = true;
+        }
+
+        if (keepStats) {
+            statTracker.updateDisconnect();
+        }
+
+        if (connectionID >= 0) {
+            DirectoryServer.connectionClosed(this);
+        }
+
+        // Indicate that this connection is no longer valid.
+        connectionValid = false;
+
+        final LocalizableMessage cancelMessage;
+        if (message != null) {
+            cancelMessage = new LocalizableMessageBuilder().append(disconnectReason.getClosureMessage()).append(": ")
+                    .append(message).toMessage();
+        } else {
+            cancelMessage = disconnectReason.getClosureMessage();
+        }
+        cancelAllOperations(new CancelRequest(true, cancelMessage));
+        finalizeConnectionInternal();
+
+        // See if we should send a notification to the client. If so, then
+        // construct and send a notice of disconnection unsolicited
+        // response. Note that we cannot send this notification to an LDAPv2 client.
+        if (sendNotification && ldapVersion != 2) {
+            try {
+                LocalizableMessage errMsg = message != null ? message
+                        : INFO_LDAP_CLIENT_GENERIC_NOTICE_OF_DISCONNECTION.get();
+                clientContext.disconnect(ResultCode.valueOf(toResultCode(disconnectReason)), errMsg.toString());
+            } catch (Exception e) {
+                // NYI -- Log a message indicating that we couldn't send the
+                // notice of disconnection.
+                logger.traceException(e);
+            }
+        } else {
+            clientContext.disconnect();
+        }
+
+        // NYI -- Deregister the client connection from any server components that
+        // might know about it.
+
+        logDisconnect(this, disconnectReason, message);
+
+        try {
+            PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
+            pluginManager.invokePostDisconnectPlugins(this, disconnectReason, message);
+        } catch (Exception e) {
             logger.traceException(e);
-          }
         }
-
-        if (!operationsInProgress.isEmpty()
-            || !getPersistentSearches().isEmpty())
-        {
-          lastCompletionTime.set(TimeThread.getTime());
-        }
-
-        operationsInProgress.clear();
-
-        for (PersistentSearch persistentSearch : getPersistentSearches())
-        {
-          persistentSearch.cancel();
-        }
-      }
-      catch (Exception e)
-      {
-        logger.traceException(e);
-      }
     }
-  }
 
-  /**
-   * Attempts to cancel all operations in progress on this connection
-   * except the operation with the specified message ID.
-   *
-   * @param cancelRequest
-   *          An object providing additional information about how the
-   *          cancel should be processed.
-   * @param messageID
-   *          The message ID of the operation that should not be
-   *          canceled.
-   */
-  @Override
-  public void cancelAllOperationsExcept(CancelRequest cancelRequest,
-      int messageID)
-  {
-    // Make sure that no one can add any new operations.
-    synchronized (opsInProgressLock)
-    {
-      try
-      {
-        for (int msgID : operationsInProgress.keySet())
-        {
-          if (msgID == messageID)
-          {
-            continue;
-          }
-
-          Operation o = operationsInProgress.get(msgID);
-          if (o != null)
-          {
-            try
-            {
-              o.abort(cancelRequest);
-
-              // TODO: Assume its cancelled?
-              if (keepStats)
-              {
-                statTracker.updateAbandonedOperation();
-              }
-            }
-            catch (Exception e)
-            {
-              logger.traceException(e);
-            }
-          }
-
-          operationsInProgress.remove(msgID);
-          lastCompletionTime.set(TimeThread.getTime());
+    private int toResultCode(DisconnectReason disconnectReason) {
+        switch (disconnectReason) {
+        case PROTOCOL_ERROR:
+            return LDAPResultCode.PROTOCOL_ERROR;
+        case SERVER_SHUTDOWN:
+            return LDAPResultCode.UNAVAILABLE;
+        case SERVER_ERROR:
+            return DirectoryServer.getServerErrorResultCode().intValue();
+        case ADMIN_LIMIT_EXCEEDED:
+        case IDLE_TIME_LIMIT_EXCEEDED:
+        case MAX_REQUEST_SIZE_EXCEEDED:
+        case IO_TIMEOUT:
+            return LDAPResultCode.ADMIN_LIMIT_EXCEEDED;
+        case CONNECTION_REJECTED:
+            return LDAPResultCode.CONSTRAINT_VIOLATION;
+        case INVALID_CREDENTIALS:
+            return LDAPResultCode.INVALID_CREDENTIALS;
+        default:
+            return LDAPResultCode.OTHER;
         }
-
-        for (PersistentSearch persistentSearch : getPersistentSearches())
-        {
-          if (persistentSearch.getMessageID() == messageID)
-          {
-            continue;
-          }
-
-          persistentSearch.cancel();
-          lastCompletionTime.set(TimeThread.getTime());
-        }
-      }
-      catch (Exception e)
-      {
-        logger.traceException(e);
-      }
     }
-  }
 
-  @Override
-  public Selector getWriteSelector()
-  {
-    throw new UnsupportedOperationException();
-  }
+    /**
+     * Retrieves the set of operations in progress for this client connection. This list must not be altered by any
+     * caller.
+     *
+     * @return The set of operations in progress for this client connection.
+     */
+    @Override
+    public Collection<Operation> getOperationsInProgress() {
+        return operationsInProgress.values();
+    }
 
-  @Override
-  public long getMaxBlockedWriteTimeLimit()
-  {
-    return connectionHandler.getMaxBlockedWriteTimeLimit();
-  }
+    /**
+     * Retrieves the operation in progress with the specified message ID.
+     *
+     * @param messageID
+     *            The message ID for the operation to retrieve.
+     * @return The operation in progress with the specified message ID, or <CODE>null</CODE> if no such operation could
+     *         be found.
+     */
+    @Override
+    public Operation getOperationInProgress(int messageID) {
+        return operationsInProgress.get(messageID);
+    }
 
-  /**
-   * Returns the total number of operations initiated on this
-   * connection.
-   *
-   * @return the total number of operations on this connection
-   */
-  @Override
-  public long getNumberOfOperations()
-  {
-    return operationsPerformed.get();
-  }
+    /**
+     * Adds the provided operation to the set of operations in progress for this client connection.
+     *
+     * @param operation
+     *            The operation to add to the set of operations in progress for this client connection.
+     * @throws DirectoryException
+     *             If the operation is not added for some reason (e.g., the client already has reached the maximum
+     *             allowed concurrent requests).
+     */
+    private void addOperationInProgress(final QueueingStrategy queueingStrategy, Operation operation)
+            throws DirectoryException {
+        int messageID = operation.getMessageID();
 
-  /**
-   * Processes the provided LDAP message read from the client and takes
-   * whatever action is appropriate. For most requests, this will
-   * include placing the operation in the work queue. Certain requests
-   * (in particular, abandons and unbinds) will be processed directly.
-   *
-   * @param queueingStrategy
-   *          The {@link QueueingStrategy} to use for operation
-   * @param message
-   *          The LDAP message to process.
-   * @return <CODE>true</CODE> if the appropriate action was taken for
-   *         the request, or <CODE>false</CODE> if there was a fatal
-   *         error and the client has been disconnected as a result, or
-   *         if the client unbound from the server.
-   */
+        // We need to grab a lock to ensure that no one else can add
+        // operations to the queue while we are performing some preliminary
+        // checks.
+        try {
+            synchronized (opsInProgressLock) {
+                // If we're already in the process of disconnecting the client,
+                // then reject the operation.
+                if (disconnectRequested) {
+                    LocalizableMessage message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
+                    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
+                }
+
+                // Add the operation to the list of operations in progress for
+                // this connection.
+                Operation op = operationsInProgress.putIfAbsent(messageID, operation);
+
+                // See if there is already an operation in progress with the
+                // same message ID. If so, then we can't allow it.
+                if (op != null) {
+                    LocalizableMessage message = WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
+                    throw new DirectoryException(ResultCode.PROTOCOL_ERROR, message);
+                }
+            }
+
+            // Try to add the operation to the work queue,
+            // or run it synchronously (typically for the administration
+            // connector)
+            queueingStrategy.enqueueRequest(operation);
+        } catch (DirectoryException de) {
+            logger.traceException(de);
+
+            operationsInProgress.remove(messageID);
+            lastCompletionTime.set(TimeThread.getTime());
+
+            throw de;
+        } catch (Exception e) {
+            logger.traceException(e);
+
+            LocalizableMessage message = WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
+            throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message, e);
+        }
+    }
+
+    /**
+     * Removes the provided operation from the set of operations in progress for this client connection. Note that this
+     * does not make any attempt to cancel any processing that may already be in progress for the operation.
+     *
+     * @param messageID
+     *            The message ID of the operation to remove from the set of operations in progress.
+     * @return <CODE>true</CODE> if the operation was found and removed from the set of operations in progress, or
+     *         <CODE>false</CODE> if not.
+     */
+    @Override
+    public boolean removeOperationInProgress(int messageID) {
+        Operation operation = operationsInProgress.remove(messageID);
+        if (operation == null) {
+            return false;
+        }
+
+        if (operation.getOperationType() == OperationType.ABANDON && keepStats
+                && operation.getResultCode() == ResultCode.CANCELLED) {
+            statTracker.updateAbandonedOperation();
+        }
+
+        lastCompletionTime.set(TimeThread.getTime());
+        return true;
+    }
+
+    /**
+     * Attempts to cancel the specified operation.
+     *
+     * @param messageID
+     *            The message ID of the operation to cancel.
+     * @param cancelRequest
+     *            An object providing additional information about how the cancel should be processed.
+     * @return A cancel result that either indicates that the cancel was successful or provides a reason that it was
+     *         not.
+     */
+    @Override
+    public CancelResult cancelOperation(int messageID, CancelRequest cancelRequest) {
+        Operation op = operationsInProgress.get(messageID);
+        if (op != null) {
+            return op.cancel(cancelRequest);
+        }
+
+        // See if the operation is in the list of persistent searches.
+        for (PersistentSearch ps : getPersistentSearches()) {
+            if (ps.getMessageID() == messageID) {
+                // We only need to find the first persistent search
+                // associated with the provided message ID. The persistent search
+                // will ensure that all other related persistent searches are cancelled.
+                return ps.cancel();
+            }
+        }
+        return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
+    }
+
+    /**
+     * Attempts to cancel all operations in progress on this connection.
+     *
+     * @param cancelRequest
+     *            An object providing additional information about how the cancel should be processed.
+     */
+    @Override
+    public void cancelAllOperations(CancelRequest cancelRequest) {
+        // Make sure that no one can add any new operations.
+        synchronized (opsInProgressLock) {
+            try {
+                for (Operation o : operationsInProgress.values()) {
+                    try {
+                        o.abort(cancelRequest);
+
+                        // TODO: Assume its cancelled?
+                        if (keepStats) {
+                            statTracker.updateAbandonedOperation();
+                        }
+                    } catch (Exception e) {
+                        logger.traceException(e);
+                    }
+                }
+
+                if (!operationsInProgress.isEmpty() || !getPersistentSearches().isEmpty()) {
+                    lastCompletionTime.set(TimeThread.getTime());
+                }
+
+                operationsInProgress.clear();
+
+                for (PersistentSearch persistentSearch : getPersistentSearches()) {
+                    persistentSearch.cancel();
+                }
+            } catch (Exception e) {
+                logger.traceException(e);
+            }
+        }
+    }
+
+    /**
+     * Attempts to cancel all operations in progress on this connection except the operation with the specified message
+     * ID.
+     *
+     * @param cancelRequest
+     *            An object providing additional information about how the cancel should be processed.
+     * @param messageID
+     *            The message ID of the operation that should not be canceled.
+     */
+    @Override
+    public void cancelAllOperationsExcept(CancelRequest cancelRequest, int messageID) {
+        // Make sure that no one can add any new operations.
+        synchronized (opsInProgressLock) {
+            try {
+                for (int msgID : operationsInProgress.keySet()) {
+                    if (msgID == messageID) {
+                        continue;
+                    }
+
+                    Operation o = operationsInProgress.get(msgID);
+                    if (o != null) {
+                        try {
+                            o.abort(cancelRequest);
+
+                            // TODO: Assume its cancelled?
+                            if (keepStats) {
+                                statTracker.updateAbandonedOperation();
+                            }
+                        } catch (Exception e) {
+                            logger.traceException(e);
+                        }
+                    }
+
+                    operationsInProgress.remove(msgID);
+                    lastCompletionTime.set(TimeThread.getTime());
+                }
+
+                for (PersistentSearch persistentSearch : getPersistentSearches()) {
+                    if (persistentSearch.getMessageID() == messageID) {
+                        continue;
+                    }
+
+                    persistentSearch.cancel();
+                    lastCompletionTime.set(TimeThread.getTime());
+                }
+            } catch (Exception e) {
+                logger.traceException(e);
+            }
+        }
+    }
+
+    @Override
+    public Selector getWriteSelector() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMaxBlockedWriteTimeLimit() {
+        return connectionHandler.getMaxBlockedWriteTimeLimit();
+    }
+
+    /**
+     * Returns the total number of operations initiated on this connection.
+     *
+     * @return the total number of operations on this connection
+     */
+    @Override
+    public long getNumberOfOperations() {
+        return operationsPerformed.get();
+    }
+
+    /**
+     * Processes the provided LDAP message read from the client and takes whatever action is appropriate. For most
+     * requests, this will include placing the operation in the work queue. Certain requests (in particular, abandons
+     * and unbinds) will be processed directly.
+     *
+     * @param queueingStrategy
+     *            The {@link QueueingStrategy} to use for operation
+     * @param message
+     *            The LDAP message to process.
+     * @return <CODE>true</CODE> if the appropriate action was taken for the request, or <CODE>false</CODE> if there was
+     *         a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
+     */
     @Override
     public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
         return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
@@ -1124,855 +957,693 @@
         }, BackpressureMode.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
     }
 
-  private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
-          final FlowableEmitter<Response> out)
-  {
-    if (keepStats)
-    {
-      statTracker.updateMessageRead(message);
+    private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+            final FlowableEmitter<Response> out) {
+        if (keepStats) {
+            statTracker.updateMessageRead(message);
+        }
+        operationsPerformed.getAndIncrement();
+
+        List<Control> opControls = message.getControls();
+
+        // FIXME -- See if there is a bind in progress. If so, then deny
+        // most kinds of operations.
+
+        // Figure out what type of operation we're dealing with based on the
+        // LDAP message. Abandon and unbind requests will be processed here.
+        // All other types of requests will be encapsulated into operations
+        // and append into the work queue to be picked up by a worker
+        // thread. Any other kinds of LDAP messages (e.g., response
+        // messages) are illegal and will result in the connection being
+        // terminated.
+        try {
+            if (bindInProgress.get()) {
+                throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_BIND_IN_PROGRESS.get());
+            } else if (startTLSInProgress.get()) {
+                throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_STARTTLS_IN_PROGRESS.get());
+            } else if (saslBindInProgress.get() && message.getProtocolOpType() != OP_TYPE_BIND_REQUEST) {
+                throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_SASLBIND_IN_PROGRESS.get());
+            }
+
+            boolean result;
+            switch (message.getProtocolOpType()) {
+            case OP_TYPE_ABANDON_REQUEST:
+                return processAbandonRequest(queueingStrategy, message, opControls, out);
+            case OP_TYPE_ADD_REQUEST:
+                return processAddRequest(queueingStrategy, message, opControls, out);
+            case OP_TYPE_BIND_REQUEST:
+                boolean isSaslBind =
+                    message.getBindRequestProtocolOp().getAuthenticationType() == AuthenticationType.SASL;
+                bindInProgress.set(true);
+                if (isSaslBind) {
+                    saslBindInProgress.set(true);
+                }
+                result = processBindRequest(queueingStrategy, message, opControls, out);
+                if (!result) {
+                    bindInProgress.set(false);
+                    if (isSaslBind) {
+                        saslBindInProgress.set(false);
+                    }
+                }
+                return result;
+            case OP_TYPE_COMPARE_REQUEST:
+                return processCompareRequest(queueingStrategy, message, opControls, out);
+            case OP_TYPE_DELETE_REQUEST:
+                return processDeleteRequest(queueingStrategy, message, opControls, out);
+            case OP_TYPE_EXTENDED_REQUEST:
+                boolean isStartTlsRequest = OID_START_TLS_REQUEST.equals(message.getExtendedRequestProtocolOp()
+                        .getOID());
+                if (isStartTlsRequest) {
+                    startTLSInProgress.set(true);
+                }
+                result = processExtendedRequest(queueingStrategy, message, opControls, out);
+                if (!result && isStartTlsRequest) {
+                    startTLSInProgress.set(false);
+                }
+                return result;
+            case OP_TYPE_MODIFY_REQUEST:
+                return processModifyRequest(queueingStrategy, message, opControls, out);
+            case OP_TYPE_MODIFY_DN_REQUEST:
+                return processModifyDNRequest(queueingStrategy, message, opControls, out);
+            case OP_TYPE_SEARCH_REQUEST:
+                return processSearchRequest(queueingStrategy, message, opControls, out);
+            case OP_TYPE_UNBIND_REQUEST:
+                return processUnbindRequest(message, opControls);
+            default:
+                LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_INVALID_REQUEST_TYPE.get(
+                        message.getProtocolOpName(), message.getMessageID());
+                disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+                return false;
+            }
+        } catch (Exception e) {
+            logger.traceException(e);
+
+            LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_PROCESSING_FAILURE.get(message.getProtocolOpName(),
+                    message.getMessageID(), e);
+            disconnect(DisconnectReason.SERVER_ERROR, true, msg);
+            return false;
+        }
     }
-    operationsPerformed.getAndIncrement();
 
-    List<Control> opControls = message.getControls();
+    /**
+     * Processes the provided LDAP message as an abandon request.
+     *
+     * @param queueingStrategy
+     *            The {@link QueueingStrategy} to use for operation
+     * @param message
+     *            The LDAP message containing the abandon request to process.
+     * @param controls
+     *            The set of pre-decoded request controls contained in the message.
+     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
+     */
+    private boolean processAbandonRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+            final List<Control> controls, final FlowableEmitter<Response> out) {
+        if (ldapVersion == 2 && !controls.isEmpty()) {
+            disconnectControlsNotAllowed();
+            return false;
+        }
 
-    // FIXME -- See if there is a bind in progress. If so, then deny
-    // most kinds of operations.
+        // Create the abandon operation and add it into the work queue.
+        AbandonRequestProtocolOp protocolOp = message.getAbandonRequestProtocolOp();
+        AbandonOperationBasis abandonOp = new AbandonOperationBasis(this, nextOperationID.getAndIncrement(),
+                message.getMessageID(), controls, protocolOp.getIDToAbandon());
+        abandonOp.setAttachment(REACTIVE_OUT, out);
 
-    // Figure out what type of operation we're dealing with based on the
-    // LDAP message. Abandon and unbind requests will be processed here.
-    // All other types of requests will be encapsulated into operations
-    // and append into the work queue to be picked up by a worker
-    // thread. Any other kinds of LDAP messages (e.g., response
-    // messages) are illegal and will result in the connection being
-    // terminated.
-    try
-    {
-      if (bindInProgress.get())
-      {
-        throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_BIND_IN_PROGRESS.get());
-      }
-      else if (startTLSInProgress.get())
-      {
-        throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_STARTTLS_IN_PROGRESS.get());
-      }
-      else if (saslBindInProgress.get() && message.getProtocolOpType() != OP_TYPE_BIND_REQUEST)
-      {
-        throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_SASLBIND_IN_PROGRESS.get());
-      }
+        try {
+            addOperationInProgress(queueingStrategy, abandonOp);
+        } catch (DirectoryException de) {
+            logger.traceException(de);
 
-      boolean result;
-      switch (message.getProtocolOpType())
-      {
-      case OP_TYPE_ABANDON_REQUEST:
-        return processAbandonRequest(queueingStrategy, message, opControls, out);
-      case OP_TYPE_ADD_REQUEST:
-        return processAddRequest(queueingStrategy, message, opControls, out);
-      case OP_TYPE_BIND_REQUEST:
-        boolean isSaslBind = message.getBindRequestProtocolOp().getAuthenticationType() == AuthenticationType.SASL;
-        bindInProgress.set(true);
-        if (isSaslBind)
-        {
-          saslBindInProgress.set(true);
+            // Don't send an error response since abandon operations
+            // don't have a response.
         }
-        result = processBindRequest(queueingStrategy, message, opControls, out);
-        if(!result)
-        {
-          bindInProgress.set(false);
-          if (isSaslBind)
-          {
-            saslBindInProgress.set(false);
-          }
+
+        return connectionValid;
+    }
+
+    /**
+     * Processes the provided LDAP message as an add request.
+     *
+     * @param queueingStrategy
+     *            The {@link QueueingStrategy} to use for operation
+     * @param message
+     *            The LDAP message containing the add request to process.
+     * @param controls
+     *            The set of pre-decoded request controls contained in the message.
+     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
+     */
+    private boolean processAddRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+            final List<Control> controls, final FlowableEmitter<Response> out) {
+        if (ldapVersion == 2 && !controls.isEmpty()) {
+            // LDAPv2 clients aren't allowed to send controls.
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+                    ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+            out.onComplete();
+            disconnectControlsNotAllowed();
+            return false;
         }
-        return result;
-      case OP_TYPE_COMPARE_REQUEST:
-        return processCompareRequest(queueingStrategy, message, opControls, out);
-      case OP_TYPE_DELETE_REQUEST:
-        return processDeleteRequest(queueingStrategy, message, opControls, out);
-      case OP_TYPE_EXTENDED_REQUEST:
-        boolean isStartTlsRequest = OID_START_TLS_REQUEST.equals(message.getExtendedRequestProtocolOp().getOID());
-        if (isStartTlsRequest)
-        {
-          startTLSInProgress.set(true);
+
+        // Create the add operation and add it into the work queue.
+        AddRequestProtocolOp protocolOp = message.getAddRequestProtocolOp();
+        AddOperationBasis addOp = new AddOperationBasis(this, nextOperationID.getAndIncrement(),
+                message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getAttributes());
+        addOp.setAttachment(REACTIVE_OUT, out);
+
+        try {
+            addOperationInProgress(queueingStrategy, addOp);
+        } catch (DirectoryException de) {
+            logger.traceException(de);
+
+            final Result result = Responses.newResult(de.getResultCode())
+                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+            for (String referral : de.getReferralURLs()) {
+                result.addReferralURI(referral);
+            }
+
+            out.onNext(result);
+            out.onComplete();
         }
-        result = processExtendedRequest(queueingStrategy, message, opControls, out);
-        if (!result && isStartTlsRequest)
-        {
-          startTLSInProgress.set(false);
+
+        return connectionValid;
+    }
+
+    private void disconnectControlsNotAllowed() {
+        disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
+    }
+
+    /**
+     * Processes the provided LDAP message as a bind request.
+     *
+     * @param queueingStrategy
+     *            The {@link QueueingStrategy} to use for operation
+     * @param message
+     *            The LDAP message containing the bind request to process.
+     * @param controls
+     *            The set of pre-decoded request controls contained in the message.
+     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
+     */
+    private boolean processBindRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+            final List<Control> controls, final FlowableEmitter<Response> out) {
+        BindRequestProtocolOp protocolOp = message.getBindRequestProtocolOp();
+
+        // See if this is an LDAPv2 bind request, and if so whether that
+        // should be allowed.
+        String versionString;
+        switch (ldapVersion = protocolOp.getProtocolVersion()) {
+        case 2:
+            versionString = "2";
+
+            if (!connectionHandler.allowLDAPv2()) {
+                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+                        ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+                out.onComplete();
+                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
+                return false;
+            }
+
+            if (!controls.isEmpty()) {
+                // LDAPv2 clients aren't allowed to send controls.
+                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+                        ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+                out.onComplete();
+                disconnectControlsNotAllowed();
+                return false;
+            }
+
+            break;
+        case 3:
+            versionString = "3";
+            break;
+        default:
+            // Unsupported protocol version. RFC4511 states that we MUST send
+            // a protocol error back to the client.
+            out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+                    ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion).toString()));
+            out.onComplete();
+            disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion));
+            return false;
         }
-        return result;
-      case OP_TYPE_MODIFY_REQUEST:
-        return processModifyRequest(queueingStrategy, message, opControls, out);
-      case OP_TYPE_MODIFY_DN_REQUEST:
-        return processModifyDNRequest(queueingStrategy, message, opControls, out);
-      case OP_TYPE_SEARCH_REQUEST:
-        return processSearchRequest(queueingStrategy, message, opControls, out);
-      case OP_TYPE_UNBIND_REQUEST:
-        return processUnbindRequest(message, opControls);
-      default:
-        LocalizableMessage msg =
-            ERR_LDAP_DISCONNECT_DUE_TO_INVALID_REQUEST_TYPE.get(message
-                .getProtocolOpName(), message.getMessageID());
-        disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+
+        ByteString bindDN = protocolOp.getDN();
+
+        BindOperationBasis bindOp;
+        switch (protocolOp.getAuthenticationType()) {
+        case SIMPLE:
+            bindOp = new BindOperationBasis(this, nextOperationID.getAndIncrement(), message.getMessageID(), controls,
+                    versionString, bindDN, protocolOp.getSimplePassword());
+            break;
+        case SASL:
+            bindOp = new BindOperationBasis(this, nextOperationID.getAndIncrement(), message.getMessageID(), controls,
+                    versionString, bindDN, protocolOp.getSASLMechanism(), protocolOp.getSASLCredentials());
+            break;
+        default:
+            // This is an invalid authentication type, and therefore a
+            // protocol error. As per RFC 2251, a protocol error in a bind
+            // request must result in terminating the connection.
+            LocalizableMessage msg = ERR_LDAP_INVALID_BIND_AUTH_TYPE.get(message.getMessageID(),
+                    protocolOp.getAuthenticationType());
+            disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+            return false;
+        }
+
+        // Add the operation into the work queue.
+        bindOp.setAttachment(REACTIVE_OUT, out);
+        try {
+            addOperationInProgress(queueingStrategy, bindOp);
+        } catch (DirectoryException de) {
+            logger.traceException(de);
+
+            final Result result = Responses.newBindResult(de.getResultCode())
+                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+            for (String referral : de.getReferralURLs()) {
+                result.addReferralURI(referral);
+            }
+
+            out.onNext(result);
+            out.onComplete();
+
+            // If it was a protocol error, then terminate the connection.
+            if (de.getResultCode() == ResultCode.PROTOCOL_ERROR) {
+                LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_BIND_PROTOCOL_ERROR.get(message.getMessageID(),
+                        de.getMessageObject());
+                disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
+            }
+        }
+
+        return connectionValid;
+    }
+
+    /**
+     * Processes the provided LDAP message as a compare request.
+     *
+     * @param queueingStrategy
+     *            The {@link QueueingStrategy} to use for operation
+     * @param message
+     *            The LDAP message containing the compare request to process.
+     * @param controls
+     *            The set of pre-decoded request controls contained in the message.
+     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
+     */
+    private boolean processCompareRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+            final List<Control> controls, final FlowableEmitter<Response> out) {
+        if (ldapVersion == 2 && !controls.isEmpty()) {
+            // LDAPv2 clients aren't allowed to send controls.
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onComplete();
+            disconnectControlsNotAllowed();
+            return false;
+        }
+
+        CompareRequestProtocolOp protocolOp = message.getCompareRequestProtocolOp();
+        CompareOperationBasis compareOp = new CompareOperationBasis(this, nextOperationID.getAndIncrement(),
+                message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getAttributeType(),
+                protocolOp.getAssertionValue());
+
+        // Add the operation into the work queue.
+        compareOp.setAttachment(REACTIVE_OUT, out);
+        try {
+            addOperationInProgress(queueingStrategy, compareOp);
+        } catch (DirectoryException de) {
+            logger.traceException(de);
+
+            final CompareResult result = Responses.newCompareResult(de.getResultCode())
+                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+            result.getReferralURIs().addAll(de.getReferralURLs());
+            out.onNext(result);
+            out.onComplete();
+        }
+
+        return connectionValid;
+    }
+
+    /**
+     * Processes the provided LDAP message as a delete request.
+     *
+     * @param queueingStrategy
+     *            The {@link QueueingStrategy} to use for operation
+     * @param message
+     *            The LDAP message containing the delete request to process.
+     * @param controls
+     *            The set of pre-decoded request controls contained in the message.
+     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
+     */
+    private boolean processDeleteRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+            final List<Control> controls, final FlowableEmitter<Response> out) {
+        if (ldapVersion == 2 && !controls.isEmpty()) {
+            // LDAPv2 clients aren't allowed to send controls.
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onComplete();
+            disconnectControlsNotAllowed();
+            return false;
+        }
+
+        DeleteRequestProtocolOp protocolOp = message.getDeleteRequestProtocolOp();
+        DeleteOperationBasis deleteOp = new DeleteOperationBasis(this, nextOperationID.getAndIncrement(),
+                message.getMessageID(), controls, protocolOp.getDN());
+
+        // Add the operation into the work queue.
+        deleteOp.setAttachment(REACTIVE_OUT, out);
+        try {
+            addOperationInProgress(queueingStrategy, deleteOp);
+        } catch (DirectoryException de) {
+            logger.traceException(de);
+
+            final Result result = Responses.newResult(de.getResultCode())
+                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
+            result.getReferralURIs().addAll(de.getReferralURLs());
+
+            out.onNext(result);
+            out.onComplete();
+        }
+
+        return connectionValid;
+    }
+
+    /**
+     * Processes the provided LDAP message as an extended request.
+     *
+     * @param queueingStrategy
+     *            The {@link QueueingStrategy} to use for operation
+     * @param message
+     *            The LDAP message containing the extended request to process.
+     * @param controls
+     *            The set of pre-decoded request controls contained in the message.
+     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
+     */
+    private boolean processExtendedRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+            final List<Control> controls, final FlowableEmitter<Response> out) {
+        // See if this is an LDAPv2 client. If it is, then they should not
+        // be issuing extended requests. We can't send a response that we
+        // can be sure they can understand, so we have no choice but to
+        // close the connection.
+        if (ldapVersion == 2) {
+            // LDAPv2 clients aren't allowed to send controls.
+            LocalizableMessage msg = ERR_LDAPV2_EXTENDED_REQUEST_NOT_ALLOWED.get(getConnectionID(),
+                    message.getMessageID());
+
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(msg.toString()));
+            out.onComplete();
+
+            logger.error(msg);
+            disconnect(DisconnectReason.PROTOCOL_ERROR, false, msg);
+            return false;
+        }
+
+        // FIXME -- Do we need to handle certain types of request here?
+        // -- StartTLS requests
+        // -- Cancel requests
+
+        ExtendedRequestProtocolOp protocolOp = message.getExtendedRequestProtocolOp();
+        ExtendedOperationBasis extendedOp = new ExtendedOperationBasis(this, nextOperationID.getAndIncrement(),
+                message.getMessageID(), controls, protocolOp.getOID(), protocolOp.getValue());
+
+        // Add the operation into the work queue.
+        extendedOp.setAttachment(REACTIVE_OUT, out);
+        try {
+            addOperationInProgress(queueingStrategy, extendedOp);
+        } catch (DirectoryException de) {
+            logger.traceException(de);
+            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
+                    .setMatchedDN(de.getMatchedDN().toString());
+            result.getReferralURIs().addAll(de.getReferralURLs());
+
+            out.onNext(result);
+            out.onComplete();
+        }
+
+        return connectionValid;
+    }
+
+    /**
+     * Processes the provided LDAP message as a modify request.
+     *
+     * @param queueingStrategy
+     *            The {@link QueueingStrategy} to use for operation
+     * @param message
+     *            The LDAP message containing the modify request to process.
+     * @param controls
+     *            The set of pre-decoded request controls contained in the message.
+     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
+     */
+    private boolean processModifyRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+            final List<Control> controls, final FlowableEmitter<Response> out) {
+        if (ldapVersion == 2 && !controls.isEmpty()) {
+            // LDAPv2 clients aren't allowed to send controls.
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onComplete();
+            disconnectControlsNotAllowed();
+            return false;
+        }
+
+        ModifyRequestProtocolOp protocolOp = message.getModifyRequestProtocolOp();
+        ModifyOperationBasis modifyOp = new ModifyOperationBasis(this, nextOperationID.getAndIncrement(),
+                message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getModifications());
+
+        // Add the operation into the work queue.
+        modifyOp.setAttachment(REACTIVE_OUT, out);
+        try {
+            addOperationInProgress(queueingStrategy, modifyOp);
+        } catch (DirectoryException de) {
+            logger.traceException(de);
+            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
+                    .setMatchedDN(de.getMatchedDN().toString());
+            result.getReferralURIs().addAll(de.getReferralURLs());
+
+            out.onNext(result);
+            out.onComplete();
+        }
+
+        return connectionValid;
+    }
+
+    /**
+     * Processes the provided LDAP message as a modify DN request.
+     *
+     * @param queueingStrategy
+     *            The {@link QueueingStrategy} to use for operation
+     * @param message
+     *            The LDAP message containing the modify DN request to process.
+     * @param controls
+     *            The set of pre-decoded request controls contained in the message.
+     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
+     */
+    private boolean processModifyDNRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+            final List<Control> controls, final FlowableEmitter<Response> out) {
+        if (ldapVersion == 2 && !controls.isEmpty()) {
+            // LDAPv2 clients aren't allowed to send controls.
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onComplete();
+            disconnectControlsNotAllowed();
+            return false;
+        }
+
+        ModifyDNRequestProtocolOp protocolOp = message.getModifyDNRequestProtocolOp();
+        ModifyDNOperationBasis modifyDNOp = new ModifyDNOperationBasis(this, nextOperationID.getAndIncrement(),
+                message.getMessageID(), controls, protocolOp.getEntryDN(), protocolOp.getNewRDN(),
+                protocolOp.deleteOldRDN(), protocolOp.getNewSuperior());
+
+        // Add the operation into the work queue.
+        modifyDNOp.setAttachment(REACTIVE_OUT, out);
+        try {
+            addOperationInProgress(queueingStrategy, modifyDNOp);
+        } catch (DirectoryException de) {
+            logger.traceException(de);
+
+            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
+                    .setMatchedDN(de.getMatchedDN().toString());
+            result.getReferralURIs().addAll(de.getReferralURLs());
+            for (Control control : modifyDNOp.getResponseControls()) {
+                result.addControl(Converters.from(control));
+            }
+            out.onNext(result);
+            out.onComplete();
+        }
+
+        return connectionValid;
+    }
+
+    /**
+     * Processes the provided LDAP message as a search request.
+     *
+     * @param queueingStrategy
+     *            The {@link QueueingStrategy} to use for operation
+     * @param message
+     *            The LDAP message containing the search request to process.
+     * @param controls
+     *            The set of pre-decoded request controls contained in the message.
+     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
+     */
+    private boolean processSearchRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
+            final List<Control> controls, final FlowableEmitter<Response> out) {
+        if (ldapVersion == 2 && !controls.isEmpty()) {
+            // LDAPv2 clients aren't allowed to send controls.
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
+                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onComplete();
+            disconnectControlsNotAllowed();
+            return false;
+        }
+
+        SearchRequestProtocolOp protocolOp = message.getSearchRequestProtocolOp();
+        SearchOperationBasis searchOp = new SearchOperationBasis(this, nextOperationID.getAndIncrement(),
+                message.getMessageID(), controls, protocolOp.getBaseDN(), protocolOp.getScope(),
+                protocolOp.getDereferencePolicy(), protocolOp.getSizeLimit(), protocolOp.getTimeLimit(),
+                protocolOp.getTypesOnly(), protocolOp.getFilter(), protocolOp.getAttributes());
+
+        // Add the operation into the work queue.
+        searchOp.setAttachment(REACTIVE_OUT, out);
+        try {
+            addOperationInProgress(queueingStrategy, searchOp);
+        } catch (DirectoryException de) {
+            logger.traceException(de);
+
+            final Result result = Responses.newResult(de.getResultCode());
+            if (de.getMessage() != null) {
+                result.setDiagnosticMessage(de.getMessage());
+            }
+            if (de.getMatchedDN() != null) {
+                result.setMatchedDN(de.getMatchedDN().toString());
+            }
+            if (de.getReferralURLs() != null) {
+                result.getReferralURIs().addAll(de.getReferralURLs());
+            }
+            if (searchOp.getResponseControls() != null) {
+                for (Control control : searchOp.getResponseControls()) {
+                    result.addControl(Converters.from(control));
+                }
+            }
+            out.onNext(result);
+            out.onComplete();
+        }
+
+        return connectionValid;
+    }
+
+    /**
+     * Processes the provided LDAP message as an unbind request.
+     *
+     * @param message
+     *            The LDAP message containing the unbind request to process.
+     * @param controls
+     *            The set of pre-decoded request controls contained in the message.
+     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
+     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
+     */
+    private boolean processUnbindRequest(final LDAPMessage message, final List<Control> controls) {
+        UnbindOperationBasis unbindOp = new UnbindOperationBasis(this, nextOperationID.getAndIncrement(),
+                message.getMessageID(), controls);
+
+        unbindOp.run();
+
+        // The client connection will never be valid after an unbind.
         return false;
-      }
-    }
-    catch (Exception e)
-    {
-      logger.traceException(e);
-
-      LocalizableMessage msg =
-          ERR_LDAP_DISCONNECT_DUE_TO_PROCESSING_FAILURE.get(message
-              .getProtocolOpName(), message.getMessageID(), e);
-      disconnect(DisconnectReason.SERVER_ERROR, true, msg);
-      return false;
-    }
-  }
-
-  /**
-   * Processes the provided LDAP message as an abandon request.
-   *
-   * @param queueingStrategy
-   *          The {@link QueueingStrategy} to use for operation
-   * @param message
-   *          The LDAP message containing the abandon request to
-   *          process.
-   * @param controls
-   *          The set of pre-decoded request controls contained in the
-   *          message.
-   * @return <CODE>true</CODE> if the request was processed
-   *         successfully, or <CODE>false</CODE> if not and the
-   *         connection has been closed as a result (it is the
-   *         responsibility of this method to close the connection).
-   */
-  private boolean processAbandonRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
-          final List<Control> controls, final FlowableEmitter<Response> out)
-  {
-    if (ldapVersion == 2 && !controls.isEmpty())
-    {
-      disconnectControlsNotAllowed();
-      return false;
     }
 
-    // Create the abandon operation and add it into the work queue.
-    AbandonRequestProtocolOp protocolOp =
-        message.getAbandonRequestProtocolOp();
-    AbandonOperationBasis abandonOp =
-        new AbandonOperationBasis(this, nextOperationID
-            .getAndIncrement(), message.getMessageID(), controls,
-            protocolOp.getIDToAbandon());
-    abandonOp.setAttachment(REACTIVE_OUT, out);
+    @Override
+    public String getMonitorSummary() {
+        StringBuilder buffer = new StringBuilder();
+        buffer.append("connID=\"");
+        buffer.append(connectionID);
+        buffer.append("\" connectTime=\"");
+        buffer.append(getConnectTimeString());
+        buffer.append("\" source=\"");
+        buffer.append(clientAddress);
+        buffer.append(":");
+        buffer.append(clientPort);
+        buffer.append("\" destination=\"");
+        buffer.append(serverAddress);
+        buffer.append(":");
+        buffer.append(connectionHandler.getListeners().iterator().next().getPort());
+        buffer.append("\" ldapVersion=\"");
+        buffer.append(ldapVersion);
+        buffer.append("\" authDN=\"");
 
-    try
-    {
-      addOperationInProgress(queueingStrategy, abandonOp);
-    }
-    catch (DirectoryException de)
-    {
-      logger.traceException(de);
-
-      // Don't send an error response since abandon operations
-      // don't have a response.
-    }
-
-    return connectionValid;
-  }
-
-  /**
-   * Processes the provided LDAP message as an add request.
-   *
-   * @param queueingStrategy
-   *          The {@link QueueingStrategy} to use for operation
-   * @param message
-   *          The LDAP message containing the add request to process.
-   * @param controls
-   *          The set of pre-decoded request controls contained in the
-   *          message.
-   * @return <CODE>true</CODE> if the request was processed
-   *         successfully, or <CODE>false</CODE> if not and the
-   *         connection has been closed as a result (it is the
-   *         responsibility of this method to close the connection).
-   */
-  private boolean processAddRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
-          final List<Control> controls, final FlowableEmitter<Response> out)
-  {
-    if (ldapVersion == 2 && !controls.isEmpty())
-    {
-      // LDAPv2 clients aren't allowed to send controls.
-      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
-                          .setDiagnosticMessage(ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
-      out.onComplete();
-      disconnectControlsNotAllowed();
-      return false;
-    }
-
-    // Create the add operation and add it into the work queue.
-    AddRequestProtocolOp protocolOp = message.getAddRequestProtocolOp();
-    AddOperationBasis addOp =
-        new AddOperationBasis(this, nextOperationID.getAndIncrement(),
-            message.getMessageID(), controls, protocolOp.getDN(),
-            protocolOp.getAttributes());
-    addOp.setAttachment(REACTIVE_OUT, out);
-
-    try
-    {
-      addOperationInProgress(queueingStrategy, addOp);
-    }
-    catch (DirectoryException de)
-    {
-      logger.traceException(de);
-
-      final Result result = Responses.newResult(de.getResultCode())
-          .setDiagnosticMessage(de.getLocalizedMessage())
-          .setMatchedDN(de.getMatchedDN().toString());
-      for(String referral : de.getReferralURLs()) {
-        result.addReferralURI(referral);
-      }
-
-      out.onNext(result);
-      out.onComplete();
-    }
-
-    return connectionValid;
-  }
-
-  private void disconnectControlsNotAllowed()
-  {
-    disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
-  }
-
-  /**
-   * Processes the provided LDAP message as a bind request.
-   *
-   * @param queueingStrategy
-   *          The {@link QueueingStrategy} to use for operation
-   * @param message
-   *          The LDAP message containing the bind request to process.
-   * @param controls
-   *          The set of pre-decoded request controls contained in the
-   *          message.
-   * @return <CODE>true</CODE> if the request was processed
-   *         successfully, or <CODE>false</CODE> if not and the
-   *         connection has been closed as a result (it is the
-   *         responsibility of this method to close the connection).
-   */
-  private boolean processBindRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
-          final List<Control> controls, final FlowableEmitter<Response> out)
-  {
-    BindRequestProtocolOp protocolOp =
-        message.getBindRequestProtocolOp();
-
-    // See if this is an LDAPv2 bind request, and if so whether that
-    // should be allowed.
-    String versionString;
-    switch (ldapVersion = protocolOp.getProtocolVersion())
-    {
-    case 2:
-      versionString = "2";
-
-      if (!connectionHandler.allowLDAPv2())
-      {
-        out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
-                            .setDiagnosticMessage(ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
-        out.onComplete();
-        disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
-        return false;
-      }
-
-      if (!controls.isEmpty())
-      {
-        // LDAPv2 clients aren't allowed to send controls.
-        out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
-            .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
-        out.onComplete();
-        disconnectControlsNotAllowed();
-        return false;
-      }
-
-      break;
-    case 3:
-      versionString = "3";
-      break;
-    default:
-      // Unsupported protocol version. RFC4511 states that we MUST send
-      // a protocol error back to the client.
-      out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
-                          .setDiagnosticMessage(ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion).toString()));
-      out.onComplete();
-      disconnect(DisconnectReason.PROTOCOL_ERROR, false,
-          ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion));
-      return false;
-    }
-
-    ByteString bindDN = protocolOp.getDN();
-
-    BindOperationBasis bindOp;
-    switch (protocolOp.getAuthenticationType())
-    {
-    case SIMPLE:
-      bindOp =
-          new BindOperationBasis(this, nextOperationID
-              .getAndIncrement(), message.getMessageID(), controls,
-              versionString, bindDN, protocolOp.getSimplePassword());
-      break;
-    case SASL:
-      bindOp =
-          new BindOperationBasis(this, nextOperationID
-              .getAndIncrement(), message.getMessageID(), controls,
-              versionString, bindDN, protocolOp.getSASLMechanism(),
-              protocolOp.getSASLCredentials());
-      break;
-    default:
-      // This is an invalid authentication type, and therefore a
-      // protocol error. As per RFC 2251, a protocol error in a bind
-      // request must result in terminating the connection.
-      LocalizableMessage msg =
-          ERR_LDAP_INVALID_BIND_AUTH_TYPE.get(message.getMessageID(),
-              protocolOp.getAuthenticationType());
-      disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
-      return false;
-    }
-
-    // Add the operation into the work queue.
-    bindOp.setAttachment(REACTIVE_OUT, out);
-    try
-    {
-      addOperationInProgress(queueingStrategy, bindOp);
-    }
-    catch (DirectoryException de)
-    {
-      logger.traceException(de);
-
-      final Result result = Responses.newBindResult(de.getResultCode())
-                                                      .setDiagnosticMessage(de.getLocalizedMessage())
-                                                      .setMatchedDN(de.getMatchedDN().toString());
-      for(String referral : de.getReferralURLs())
-      {
-        result.addReferralURI(referral);
-      }
-
-      out.onNext(result);
-      out.onComplete();
-
-      // If it was a protocol error, then terminate the connection.
-      if (de.getResultCode() == ResultCode.PROTOCOL_ERROR)
-      {
-        LocalizableMessage msg =
-            ERR_LDAP_DISCONNECT_DUE_TO_BIND_PROTOCOL_ERROR.get(message
-                .getMessageID(), de.getMessageObject());
-        disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
-      }
-    }
-
-    return connectionValid;
-  }
-
-  /**
-   * Processes the provided LDAP message as a compare request.
-   *
-   * @param queueingStrategy
-   *          The {@link QueueingStrategy} to use for operation
-   * @param message
-   *          The LDAP message containing the compare request to
-   *          process.
-   * @param controls
-   *          The set of pre-decoded request controls contained in the
-   *          message.
-   * @return <CODE>true</CODE> if the request was processed
-   *         successfully, or <CODE>false</CODE> if not and the
-   *         connection has been closed as a result (it is the
-   *         responsibility of this method to close the connection).
-   */
-  private boolean processCompareRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
-          final List<Control> controls, final FlowableEmitter<Response> out)
-  {
-    if (ldapVersion == 2 && !controls.isEmpty())
-    {
-      // LDAPv2 clients aren't allowed to send controls.
-      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
-                          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
-      out.onComplete();
-      disconnectControlsNotAllowed();
-      return false;
-    }
-
-    CompareRequestProtocolOp protocolOp =
-        message.getCompareRequestProtocolOp();
-    CompareOperationBasis compareOp =
-        new CompareOperationBasis(this, nextOperationID
-            .getAndIncrement(), message.getMessageID(), controls,
-            protocolOp.getDN(), protocolOp.getAttributeType(),
-            protocolOp.getAssertionValue());
-
-    // Add the operation into the work queue.
-    compareOp.setAttachment(REACTIVE_OUT, out);
-    try
-    {
-      addOperationInProgress(queueingStrategy, compareOp);
-    }
-    catch (DirectoryException de)
-    {
-      logger.traceException(de);
-
-      final CompareResult result = Responses.newCompareResult(de.getResultCode())
-                                            .setDiagnosticMessage(de.getLocalizedMessage())
-                                            .setMatchedDN(de.getMatchedDN().toString());
-      result.getReferralURIs().addAll(de.getReferralURLs());
-      out.onNext(result);
-      out.onComplete();
-    }
-
-    return connectionValid;
-  }
-
-  /**
-   * Processes the provided LDAP message as a delete request.
-   * @param queueingStrategy
-   *          The {@link QueueingStrategy} to use for operation
-   * @param message
-   *          The LDAP message containing the delete request to process.
-   * @param controls
-   *          The set of pre-decoded request controls contained in the
-   *          message.
-   * @return <CODE>true</CODE> if the request was processed
-   *         successfully, or <CODE>false</CODE> if not and the
-   *         connection has been closed as a result (it is the
-   *         responsibility of this method to close the connection).
-   */
-  private boolean processDeleteRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
-          final List<Control> controls, final FlowableEmitter<Response> out)
-  {
-    if (ldapVersion == 2 && !controls.isEmpty())
-    {
-      // LDAPv2 clients aren't allowed to send controls.
-      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
-                          .setDiagnosticMessage( ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
-      out.onComplete();
-      disconnectControlsNotAllowed();
-      return false;
-    }
-
-    DeleteRequestProtocolOp protocolOp =
-        message.getDeleteRequestProtocolOp();
-    DeleteOperationBasis deleteOp =
-        new DeleteOperationBasis(this, nextOperationID
-            .getAndIncrement(), message.getMessageID(), controls,
-            protocolOp.getDN());
-
-    // Add the operation into the work queue.
-    deleteOp.setAttachment(REACTIVE_OUT, out);
-    try
-    {
-      addOperationInProgress(queueingStrategy, deleteOp);
-    }
-    catch (DirectoryException de)
-    {
-      logger.traceException(de);
-
-      final Result result = Responses.newResult(de.getResultCode())
-                                     .setDiagnosticMessage(de.getLocalizedMessage())
-                                     .setMatchedDN(de.getMatchedDN().toString());
-      result.getReferralURIs().addAll(de.getReferralURLs());
-
-      out.onNext(result);
-      out.onComplete();
-    }
-
-    return connectionValid;
-  }
-
-  /**
-   * Processes the provided LDAP message as an extended request.
-   * @param queueingStrategy
-   *          The {@link QueueingStrategy} to use for operation
-   * @param message
-   *          The LDAP message containing the extended request to
-   *          process.
-   * @param controls
-   *          The set of pre-decoded request controls contained in the
-   *          message.
-   * @return <CODE>true</CODE> if the request was processed
-   *         successfully, or <CODE>false</CODE> if not and the
-   *         connection has been closed as a result (it is the
-   *         responsibility of this method to close the connection).
-   */
-  private boolean processExtendedRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
-      final List<Control> controls, final FlowableEmitter<Response> out)
-  {
-    // See if this is an LDAPv2 client. If it is, then they should not
-    // be issuing extended requests. We can't send a response that we
-    // can be sure they can understand, so we have no choice but to
-    // close the connection.
-    if (ldapVersion == 2)
-    {
-      // LDAPv2 clients aren't allowed to send controls.
-      LocalizableMessage msg = ERR_LDAPV2_EXTENDED_REQUEST_NOT_ALLOWED.get(getConnectionID(), message.getMessageID());
-
-      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
-                          .setDiagnosticMessage(msg.toString()));
-      out.onComplete();
-
-      logger.error(msg);
-      disconnect(DisconnectReason.PROTOCOL_ERROR, false, msg);
-      return false;
-    }
-
-    // FIXME -- Do we need to handle certain types of request here?
-    // -- StartTLS requests
-    // -- Cancel requests
-
-    ExtendedRequestProtocolOp protocolOp =
-        message.getExtendedRequestProtocolOp();
-    ExtendedOperationBasis extendedOp =
-        new ExtendedOperationBasis(this, nextOperationID
-            .getAndIncrement(), message.getMessageID(), controls,
-            protocolOp.getOID(), protocolOp.getValue());
-
-    // Add the operation into the work queue.
-    extendedOp.setAttachment(REACTIVE_OUT, out);
-    try
-    {
-      addOperationInProgress(queueingStrategy, extendedOp);
-    }
-    catch (DirectoryException de)
-    {
-      logger.traceException(de);
-      final Result result = Responses.newResult(de.getResultCode())
-                                     .setDiagnosticMessage(de.getMessage())
-                                     .setMatchedDN(de.getMatchedDN().toString());
-      result.getReferralURIs().addAll(de.getReferralURLs());
-
-      out.onNext(result);
-      out.onComplete();
-    }
-
-    return connectionValid;
-  }
-
-  /**
-   * Processes the provided LDAP message as a modify request.
-   *
-   * @param queueingStrategy
-   *          The {@link QueueingStrategy} to use for operation
-   * @param message
-   *          The LDAP message containing the modify request to process.
-   * @param controls
-   *          The set of pre-decoded request controls contained in the
-   *          message.
-   * @return <CODE>true</CODE> if the request was processed
-   *         successfully, or <CODE>false</CODE> if not and the
-   *         connection has been closed as a result (it is the
-   *         responsibility of this method to close the connection).
-   */
-  private boolean processModifyRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
-          final List<Control> controls, final FlowableEmitter<Response> out)
-  {
-    if (ldapVersion == 2 && !controls.isEmpty())
-    {
-      // LDAPv2 clients aren't allowed to send controls.
-      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
-                          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
-      out.onComplete();
-      disconnectControlsNotAllowed();
-      return false;
-    }
-
-    ModifyRequestProtocolOp protocolOp =
-        message.getModifyRequestProtocolOp();
-    ModifyOperationBasis modifyOp =
-        new ModifyOperationBasis(this, nextOperationID
-            .getAndIncrement(), message.getMessageID(), controls,
-            protocolOp.getDN(), protocolOp.getModifications());
-
-    // Add the operation into the work queue.
-    modifyOp.setAttachment(REACTIVE_OUT, out);
-    try
-    {
-      addOperationInProgress(queueingStrategy, modifyOp);
-    }
-    catch (DirectoryException de)
-    {
-      logger.traceException(de);
-      final Result result =
-          Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()).setMatchedDN(de.getMatchedDN()
-              .toString());
-      result.getReferralURIs().addAll(de.getReferralURLs());
-
-      out.onNext(result);
-      out.onComplete();
-    }
-
-    return connectionValid;
-  }
-
-  /**
-   * Processes the provided LDAP message as a modify DN request.
-   *
-   * @param queueingStrategy
-   *          The {@link QueueingStrategy} to use for operation
-   * @param message
-   *          The LDAP message containing the modify DN request to
-   *          process.
-   * @param controls
-   *          The set of pre-decoded request controls contained in the
-   *          message.
-   * @return <CODE>true</CODE> if the request was processed
-   *         successfully, or <CODE>false</CODE> if not and the
-   *         connection has been closed as a result (it is the
-   *         responsibility of this method to close the connection).
-   */
-  private boolean processModifyDNRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
-          final List<Control> controls, final FlowableEmitter<Response> out)
-  {
-    if (ldapVersion == 2 && !controls.isEmpty())
-    {
-      // LDAPv2 clients aren't allowed to send controls.
-      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
-          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
-      out.onComplete();
-      disconnectControlsNotAllowed();
-      return false;
-    }
-
-    ModifyDNRequestProtocolOp protocolOp =
-        message.getModifyDNRequestProtocolOp();
-    ModifyDNOperationBasis modifyDNOp =
-        new ModifyDNOperationBasis(this, nextOperationID
-            .getAndIncrement(), message.getMessageID(), controls,
-            protocolOp.getEntryDN(), protocolOp.getNewRDN(), protocolOp
-                .deleteOldRDN(), protocolOp.getNewSuperior());
-
-    // Add the operation into the work queue.
-    modifyDNOp.setAttachment(REACTIVE_OUT, out);
-    try
-    {
-      addOperationInProgress(queueingStrategy, modifyDNOp);
-    }
-    catch (DirectoryException de)
-    {
-      logger.traceException(de);
-
-      final Result result =
-          Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()).setMatchedDN(de.getMatchedDN()
-              .toString());
-      result.getReferralURIs().addAll(de.getReferralURLs());
-      for(Control control : modifyDNOp.getResponseControls()) {
-        result.addControl(Converters.from(control));
-      }
-      out.onNext(result);
-      out.onComplete();
-    }
-
-    return connectionValid;
-  }
-
-  /**
-   * Processes the provided LDAP message as a search request.
-   *
-   * @param queueingStrategy
-   *          The {@link QueueingStrategy} to use for operation
-   * @param message
-   *          The LDAP message containing the search request to process.
-   * @param controls
-   *          The set of pre-decoded request controls contained in the
-   *          message.
-   * @return <CODE>true</CODE> if the request was processed
-   *         successfully, or <CODE>false</CODE> if not and the
-   *         connection has been closed as a result (it is the
-   *         responsibility of this method to close the connection).
-   */
-  private boolean processSearchRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
-          final List<Control> controls, final FlowableEmitter<Response> out)
-  {
-    if (ldapVersion == 2 && !controls.isEmpty())
-    {
-      // LDAPv2 clients aren't allowed to send controls.
-      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
-          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
-      out.onComplete();
-      disconnectControlsNotAllowed();
-      return false;
-    }
-
-    SearchRequestProtocolOp protocolOp =
-        message.getSearchRequestProtocolOp();
-    SearchOperationBasis searchOp =
-        new SearchOperationBasis(this, nextOperationID
-            .getAndIncrement(), message.getMessageID(), controls,
-            protocolOp.getBaseDN(), protocolOp.getScope(), protocolOp
-                .getDereferencePolicy(), protocolOp.getSizeLimit(),
-            protocolOp.getTimeLimit(), protocolOp.getTypesOnly(),
-            protocolOp.getFilter(), protocolOp.getAttributes());
-
-    // Add the operation into the work queue.
-    searchOp.setAttachment(REACTIVE_OUT, out);
-    try
-    {
-      addOperationInProgress(queueingStrategy, searchOp);
-    }
-    catch (DirectoryException de)
-    {
-      logger.traceException(de);
-
-      final Result result = Responses.newResult(de.getResultCode());
-      if (de.getMessage() != null)
-      {
-        result.setDiagnosticMessage(de.getMessage());
-      }
-      if (de.getMatchedDN() != null)
-      {
-        result.setMatchedDN(de.getMatchedDN().toString());
-      }
-      if (de.getReferralURLs() != null)
-      {
-        result.getReferralURIs().addAll(de.getReferralURLs());
-      }
-      if (searchOp.getResponseControls() != null)
-      {
-        for (Control control : searchOp.getResponseControls())
-        {
-          result.addControl(Converters.from(control));
+        DN authDN = getAuthenticationInfo().getAuthenticationDN();
+        if (authDN != null) {
+            buffer.append(authDN);
         }
-      }
-      out.onNext(result);
-      out.onComplete();
+
+        buffer.append("\" security=\"");
+        buffer.append("none");
+
+        buffer.append("\" opsInProgress=\"");
+        buffer.append(operationsInProgress.size());
+        buffer.append("\"");
+
+        int countPSearch = getPersistentSearches().size();
+        if (countPSearch > 0) {
+            buffer.append(" persistentSearches=\"");
+            buffer.append(countPSearch);
+            buffer.append("\"");
+        }
+        return buffer.toString();
     }
 
-    return connectionValid;
-  }
-
-  /**
-   * Processes the provided LDAP message as an unbind request.
-   *
-   * @param message
-   *          The LDAP message containing the unbind request to process.
-   * @param controls
-   *          The set of pre-decoded request controls contained in the
-   *          message.
-   * @return <CODE>true</CODE> if the request was processed
-   *         successfully, or <CODE>false</CODE> if not and the
-   *         connection has been closed as a result (it is the
-   *         responsibility of this method to close the connection).
-   */
-  private boolean processUnbindRequest(final LDAPMessage message, final List<Control> controls)
-  {
-    UnbindOperationBasis unbindOp =
-        new UnbindOperationBasis(this, nextOperationID
-            .getAndIncrement(), message.getMessageID(), controls);
-
-    unbindOp.run();
-
-    // The client connection will never be valid after an unbind.
-    return false;
-  }
-
-  @Override
-  public String getMonitorSummary()
-  {
-    StringBuilder buffer = new StringBuilder();
-    buffer.append("connID=\"");
-    buffer.append(connectionID);
-    buffer.append("\" connectTime=\"");
-    buffer.append(getConnectTimeString());
-    buffer.append("\" source=\"");
-    buffer.append(clientAddress);
-    buffer.append(":");
-    buffer.append(clientPort);
-    buffer.append("\" destination=\"");
-    buffer.append(serverAddress);
-    buffer.append(":");
-    buffer.append(connectionHandler.getListeners().iterator().next().getPort());
-    buffer.append("\" ldapVersion=\"");
-    buffer.append(ldapVersion);
-    buffer.append("\" authDN=\"");
-
-    DN authDN = getAuthenticationInfo().getAuthenticationDN();
-    if (authDN != null)
-    {
-      buffer.append(authDN);
+    /**
+     * Appends a string representation of this client connection to the provided buffer.
+     *
+     * @param buffer
+     *            The buffer to which the information should be appended.
+     */
+    @Override
+    public void toString(StringBuilder buffer) {
+        buffer.append("LDAP client connection from ");
+        buffer.append(clientAddress);
+        buffer.append(":");
+        buffer.append(clientPort);
+        buffer.append(" to ");
+        buffer.append(serverAddress);
+        buffer.append(":");
+        buffer.append(serverPort);
     }
 
-    buffer.append("\" security=\"");
-    buffer.append("none");
-
-    buffer.append("\" opsInProgress=\"");
-    buffer.append(operationsInProgress.size());
-    buffer.append("\"");
-
-    int countPSearch = getPersistentSearches().size();
-    if (countPSearch > 0)
-    {
-      buffer.append(" persistentSearches=\"");
-      buffer.append(countPSearch);
-      buffer.append("\"");
+    @Override
+    public boolean prepareTLS(LocalizableMessageBuilder unavailableReason) {
+        throw new UnsupportedOperationException();
     }
-    return buffer.toString();
-  }
 
-  /**
-   * Appends a string representation of this client connection to the
-   * provided buffer.
-   *
-   * @param buffer
-   *          The buffer to which the information should be appended.
-   */
-  @Override
-  public void toString(StringBuilder buffer)
-  {
-    buffer.append("LDAP client connection from ");
-    buffer.append(clientAddress);
-    buffer.append(":");
-    buffer.append(clientPort);
-    buffer.append(" to ");
-    buffer.append(serverAddress);
-    buffer.append(":");
-    buffer.append(serverPort);
-  }
-
-  @Override
-  public boolean prepareTLS(LocalizableMessageBuilder unavailableReason)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Retrieves the length of time in milliseconds that this client
-   * connection has been idle. <BR>
-   * <BR>
-   * Note that the default implementation will always return zero.
-   * Subclasses associated with connection handlers should override this
-   * method if they wish to provided idle time limit functionality.
-   *
-   * @return The length of time in milliseconds that this client
-   *         connection has been idle.
-   */
-  @Override
-  public long getIdleTime()
-  {
-    if (operationsInProgress.isEmpty()
-        && getPersistentSearches().isEmpty())
-    {
-      return TimeThread.getTime() - lastCompletionTime.get();
+    /**
+     * Retrieves the length of time in milliseconds that this client connection has been idle. <BR>
+     * <BR>
+     * Note that the default implementation will always return zero. Subclasses associated with connection handlers
+     * should override this method if they wish to provided idle time limit functionality.
+     *
+     * @return The length of time in milliseconds that this client connection has been idle.
+     */
+    @Override
+    public long getIdleTime() {
+        if (operationsInProgress.isEmpty() && getPersistentSearches().isEmpty()) {
+            return TimeThread.getTime() - lastCompletionTime.get();
+        } else {
+            // There's at least one operation in progress, so it's not idle.
+            return 0L;
+        }
     }
-    else
-    {
-      // There's at least one operation in progress, so it's not idle.
-      return 0L;
+
+    /**
+     * Return the certificate chain array associated with a connection.
+     *
+     * @return The array of certificates associated with a connection.
+     */
+    public Certificate[] getClientCertificateChain() {
+        return new Certificate[0];
     }
-  }
 
-  /**
-   * Return the certificate chain array associated with a connection.
-   *
-   * @return The array of certificates associated with a connection.
-   */
-  public Certificate[] getClientCertificateChain()
-  {
-    return new Certificate[0];
-  }
-
-  @Override
-  public int getSSF()
-  {
-    return 0;
-  }
+    @Override
+    public int getSSF() {
+        return 0;
+    }
 }

--
Gitblit v1.10.0