mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
25.07.2016 464f6dd52a1eeeef0f7b38d4dc1840501818a36f
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -105,1015 +105,848 @@
import io.reactivex.FlowableOnSubscribe;
/**
 * This class defines an LDAP client connection, which is a type of
 * client connection that will be accepted by an instance of the LDAP
 * connection handler and have its requests decoded by an LDAP request
 * handler.
 * This class defines an LDAP client connection, which is a type of client connection that will be accepted by an
 * instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
 */
public final class LDAPClientConnection2 extends ClientConnection implements
    TLSCapableConnection, ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>>
{
  private static final String REACTIVE_OUT = "reactive.out";
public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
        ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
    private static final String REACTIVE_OUT = "reactive.out";
/** The tracer object for the debug logger. */
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
    /** The tracer object for the debug logger. */
    private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  /** The time that the last operation was completed. */
  private final AtomicLong lastCompletionTime;
  /** The next operation ID that should be used for this connection. */
  private final AtomicLong nextOperationID;
    /** The time that the last operation was completed. */
    private final AtomicLong lastCompletionTime;
    /** The next operation ID that should be used for this connection. */
    private final AtomicLong nextOperationID;
  /**
   * Indicates whether the Directory Server believes this connection to be valid
   * and available for communication.
   */
  private volatile boolean connectionValid;
    /**
     * Indicates whether the Directory Server believes this connection to be valid and available for communication.
     */
    private volatile boolean connectionValid;
  /**
   * Indicates whether this connection is about to be closed. This will be used
   * to prevent accepting new requests while a disconnect is in progress.
   */
  private boolean disconnectRequested;
    /**
     * Indicates whether this connection is about to be closed. This will be used to prevent accepting new requests
     * while a disconnect is in progress.
     */
    private boolean disconnectRequested;
  /**
   * Indicates whether the connection should keep statistics regarding the
   * operations that it is performing.
   */
  private final boolean keepStats;
    /**
     * Indicates whether the connection should keep statistics regarding the operations that it is performing.
     */
    private final boolean keepStats;
  /** The set of all operations currently in progress on this connection. */
  private final ConcurrentHashMap<Integer, Operation> operationsInProgress;
    /** The set of all operations currently in progress on this connection. */
    private final ConcurrentHashMap<Integer, Operation> operationsInProgress;
  /**
   * The number of operations performed on this connection. Used to compare with
   * the resource limits of the network group.
   */
  private final AtomicLong operationsPerformed;
    /**
     * The number of operations performed on this connection. Used to compare with the resource limits of the network
     * group.
     */
    private final AtomicLong operationsPerformed;
  /** The port on the client from which this connection originated. */
  private final int clientPort;
  /** The LDAP version that the client is using to communicate with the server. */
  private int ldapVersion;
  /** The port on the server to which this client has connected. */
  private final int serverPort;
    /** The port on the client from which this connection originated. */
    private final int clientPort;
    /** The LDAP version that the client is using to communicate with the server. */
    private int ldapVersion;
    /** The port on the server to which this client has connected. */
    private final int serverPort;
  /** The reference to the connection handler that accepted this connection. */
  private final LDAPConnectionHandler2 connectionHandler;
  /** The statistics tracker associated with this client connection. */
  private final LDAPStatistics statTracker;
  private final boolean useNanoTime;
    /** The reference to the connection handler that accepted this connection. */
    private final LDAPConnectionHandler2 connectionHandler;
    /** The statistics tracker associated with this client connection. */
    private final LDAPStatistics statTracker;
    private final boolean useNanoTime;
  /** The connection ID assigned to this connection. */
  private final long connectionID;
    /** The connection ID assigned to this connection. */
    private final long connectionID;
  /** The lock used to provide threadsafe access to the set of operations in progress. */
  private final Object opsInProgressLock;
    /** The lock used to provide threadsafe access to the set of operations in progress. */
    private final Object opsInProgressLock;
  /** The socket channel with which this client connection is associated. */
  private final LDAPClientContext clientContext;
    /** The socket channel with which this client connection is associated. */
    private final LDAPClientContext clientContext;
  /** The string representation of the address of the client. */
  private final String clientAddress;
  /** The name of the protocol that the client is using to communicate with the server. */
  private final String protocol;
  /** The string representation of the address of the server to which the client has connected. */
  private final String serverAddress;
    /** The string representation of the address of the client. */
    private final String clientAddress;
    /** The name of the protocol that the client is using to communicate with the server. */
    private final String protocol;
    /** The string representation of the address of the server to which the client has connected. */
    private final String serverAddress;
  /**
   * Creates a new LDAP client connection with the provided information.
   *
   * @param connectionHandler
   *          The connection handler that accepted this connection.
   * @param clientContext
   *          The socket channel that may be used to communicate with
   *          the client.
   * @param  protocol String representing the protocol (LDAP or LDAP+SSL).
   * @throws LdapException
   * @throws DirectoryException If SSL initialisation fails.
   */
  LDAPClientConnection2(LDAPConnectionHandler2 connectionHandler, LDAPClientContext clientContext, String protocol,
          boolean keepStats)
  {
    this.connectionHandler = connectionHandler;
    this.clientContext = clientContext;
    opsInProgressLock = new Object();
    ldapVersion = 3;
    lastCompletionTime = new AtomicLong(TimeThread.getTime());
    nextOperationID = new AtomicLong(0);
    connectionValid = true;
    disconnectRequested = false;
    operationsInProgress = new ConcurrentHashMap<>();
    operationsPerformed = new AtomicLong(0);
    this.keepStats = keepStats;
    this.protocol = protocol;
    /**
     * Creates a new LDAP client connection with the provided information.
     *
     * @param connectionHandler
     *            The connection handler that accepted this connection.
     * @param clientContext
     *            The socket channel that may be used to communicate with the client.
     * @param protocol
     *            String representing the protocol (LDAP or LDAP+SSL).
     * @throws LdapException
     * @throws DirectoryException
     *             If SSL initialisation fails.
     */
    LDAPClientConnection2(LDAPConnectionHandler2 connectionHandler, LDAPClientContext clientContext, String protocol,
            boolean keepStats) {
        this.connectionHandler = connectionHandler;
        this.clientContext = clientContext;
        opsInProgressLock = new Object();
        ldapVersion = 3;
        lastCompletionTime = new AtomicLong(TimeThread.getTime());
        nextOperationID = new AtomicLong(0);
        connectionValid = true;
        disconnectRequested = false;
        operationsInProgress = new ConcurrentHashMap<>();
        operationsPerformed = new AtomicLong(0);
        this.keepStats = keepStats;
        this.protocol = protocol;
    clientAddress = clientContext.getPeerAddress().getAddress().getHostAddress();
    clientPort = clientContext.getPeerAddress().getPort();
    serverAddress = clientContext.getLocalAddress().getAddress().getHostAddress();
    serverPort = clientContext.getLocalAddress().getPort();
        clientAddress = clientContext.getPeerAddress().getAddress().getHostAddress();
        clientPort = clientContext.getPeerAddress().getPort();
        serverAddress = clientContext.getLocalAddress().getAddress().getHostAddress();
        serverPort = clientContext.getLocalAddress().getPort();
    statTracker = this.connectionHandler.getStatTracker();
    if (keepStats)
    {
      statTracker.updateConnect();
      this.useNanoTime = DirectoryServer.getUseNanoTime();
    }
    else
    {
      this.useNanoTime = false;
    }
    connectionID = DirectoryServer.newConnectionAccepted(this);
    clientContext.onDisconnect(new DisconnectListener()
    {
      @Override
      public void exceptionOccurred(LDAPClientContext context, Throwable error)
      {
        if (error instanceof LocalizableException)
        {
          disconnect(DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
        }
        else
        {
          disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
        }
      }
      @Override
      public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage)
      {
        disconnect(DisconnectReason.SERVER_ERROR, false, null);
      }
      @Override
      public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest)
      {
        disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
      }
    });
  }
  /**
   * Retrieves the connection ID assigned to this connection.
   *
   * @return The connection ID assigned to this connection.
   */
  @Override
  public long getConnectionID()
  {
    return connectionID;
  }
  /**
   * Retrieves the connection handler that accepted this client
   * connection.
   *
   * @return The connection handler that accepted this client
   *         connection.
   */
  @Override
  public ConnectionHandler<?> getConnectionHandler()
  {
    return connectionHandler;
  }
  /**
   * Retrieves the socket channel that can be used to communicate with
   * the client.
   *
   * @return The socket channel that can be used to communicate with the
   *         client.
   */
  @Override
  public SocketChannel getSocketChannel()
  {
    throw new UnsupportedOperationException();
  }
  /**
   * Retrieves the protocol that the client is using to communicate with
   * the Directory Server.
   *
   * @return The protocol that the client is using to communicate with
   *         the Directory Server.
   */
  @Override
  public String getProtocol()
  {
    return protocol;
  }
  /**
   * Retrieves a string representation of the address of the client.
   *
   * @return A string representation of the address of the client.
   */
  @Override
  public String getClientAddress()
  {
    return clientAddress;
  }
  /**
   * Retrieves the port number for this connection on the client system.
   *
   * @return The port number for this connection on the client system.
   */
  @Override
  public int getClientPort()
  {
    return clientPort;
  }
  /**
   * Retrieves a string representation of the address on the server to
   * which the client connected.
   *
   * @return A string representation of the address on the server to
   *         which the client connected.
   */
  @Override
  public String getServerAddress()
  {
    return serverAddress;
  }
  /**
   * Retrieves the port number for this connection on the server system.
   *
   * @return The port number for this connection on the server system.
   */
  @Override
  public int getServerPort()
  {
    return serverPort;
  }
  /**
   * Retrieves the <CODE>java.net.InetAddress</CODE> associated with the
   * remote client system.
   *
   * @return The <CODE>java.net.InetAddress</CODE> associated with the
   *         remote client system. It may be <CODE>null</CODE> if the
   *         client is not connected over an IP-based connection.
   */
  @Override
  public InetAddress getRemoteAddress()
  {
    return clientContext.getPeerAddress().getAddress();
  }
  /**
   * Retrieves the <CODE>java.net.InetAddress</CODE> for the Directory
   * Server system to which the client has established the connection.
   *
   * @return The <CODE>java.net.InetAddress</CODE> for the Directory
   *         Server system to which the client has established the
   *         connection. It may be <CODE>null</CODE> if the client is
   *         not connected over an IP-based connection.
   */
  @Override
  public InetAddress getLocalAddress()
  {
    return clientContext.getLocalAddress().getAddress();
  }
  @Override
  public boolean isConnectionValid()
  {
    return this.connectionValid;
  }
  /**
   * Indicates whether this client connection is currently using a
   * secure mechanism to communicate with the server. Note that this may
   * change over time based on operations performed by the client or
   * server (e.g., it may go from <CODE>false</CODE> to
   * <CODE>true</CODE> if the client uses the StartTLS extended
   * operation).
   *
   * @return <CODE>true</CODE> if the client connection is currently
   *         using a secure mechanism to communicate with the server, or
   *         <CODE>false</CODE> if not.
   */
  @Override
  public boolean isSecure()
  {
    return false;
  }
  /**
   * Sends a response to the client based on the information in the
   * provided operation.
   *
   * @param operation
   *          The operation for which to send the response.
   */
  @Override
  public void sendResponse(Operation operation)
  {
    // Since this is the final response for this operation, we can go
    // ahead and remove it from the "operations in progress" list. It
    // can't be canceled after this point, and this will avoid potential
    // race conditions in which the client immediately sends another
    // request with the same message ID as was used for this operation.
    if (keepStats) {
        long time;
        if (useNanoTime) {
            time = operation.getProcessingNanoTime();
        statTracker = this.connectionHandler.getStatTracker();
        if (keepStats) {
            statTracker.updateConnect();
            this.useNanoTime = DirectoryServer.getUseNanoTime();
        } else {
            time = operation.getProcessingTime();
            this.useNanoTime = false;
        }
        this.statTracker.updateOperationMonitoringData(
                operation.getOperationType(),
                time);
        connectionID = DirectoryServer.newConnectionAccepted(this);
        clientContext.onDisconnect(new DisconnectListener() {
            @Override
            public void exceptionOccurred(LDAPClientContext context, Throwable error) {
                if (error instanceof LocalizableException) {
                    disconnect(
                            DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
                } else {
                    disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
                }
            }
            @Override
            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
                    String diagnosticMessage) {
                disconnect(DisconnectReason.SERVER_ERROR, false, null);
            }
            @Override
            public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
                disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
            }
        });
    }
    // Avoid sending the response if one has already been sent. This may happen
    // if operation processing encounters a run-time exception after sending the
    // response: the worker thread exception handling code will attempt to send
    // an error result to the client indicating that a problem occurred.
    if (removeOperationInProgress(operation.getMessageID()))
    {
      final Response response = operationToResponse(operation);
      final FlowableEmitter<Response> out = getOut(operation);
      if (response != null)
      {
    /**
     * Retrieves the connection ID assigned to this connection.
     *
     * @return The connection ID assigned to this connection.
     */
    @Override
    public long getConnectionID() {
        return connectionID;
    }
    /**
     * Retrieves the connection handler that accepted this client connection.
     *
     * @return The connection handler that accepted this client connection.
     */
    @Override
    public ConnectionHandler<?> getConnectionHandler() {
        return connectionHandler;
    }
    /**
     * Retrieves the socket channel that can be used to communicate with the client.
     *
     * @return The socket channel that can be used to communicate with the client.
     */
    @Override
    public SocketChannel getSocketChannel() {
        throw new UnsupportedOperationException();
    }
    /**
     * Retrieves the protocol that the client is using to communicate with the Directory Server.
     *
     * @return The protocol that the client is using to communicate with the Directory Server.
     */
    @Override
    public String getProtocol() {
        return protocol;
    }
    /**
     * Retrieves a string representation of the address of the client.
     *
     * @return A string representation of the address of the client.
     */
    @Override
    public String getClientAddress() {
        return clientAddress;
    }
    /**
     * Retrieves the port number for this connection on the client system.
     *
     * @return The port number for this connection on the client system.
     */
    @Override
    public int getClientPort() {
        return clientPort;
    }
    /**
     * Retrieves a string representation of the address on the server to which the client connected.
     *
     * @return A string representation of the address on the server to which the client connected.
     */
    @Override
    public String getServerAddress() {
        return serverAddress;
    }
    /**
     * Retrieves the port number for this connection on the server system.
     *
     * @return The port number for this connection on the server system.
     */
    @Override
    public int getServerPort() {
        return serverPort;
    }
    /**
     * Retrieves the <CODE>java.net.InetAddress</CODE> associated with the remote client system.
     *
     * @return The <CODE>java.net.InetAddress</CODE> associated with the remote client system. It may be
     *         <CODE>null</CODE> if the client is not connected over an IP-based connection.
     */
    @Override
    public InetAddress getRemoteAddress() {
        return clientContext.getPeerAddress().getAddress();
    }
    /**
     * Retrieves the <CODE>java.net.InetAddress</CODE> for the Directory Server system to which the client has
     * established the connection.
     *
     * @return The <CODE>java.net.InetAddress</CODE> for the Directory Server system to which the client has established
     *         the connection. It may be <CODE>null</CODE> if the client is not connected over an IP-based connection.
     */
    @Override
    public InetAddress getLocalAddress() {
        return clientContext.getLocalAddress().getAddress();
    }
    @Override
    public boolean isConnectionValid() {
        return this.connectionValid;
    }
    /**
     * Indicates whether this client connection is currently using a secure mechanism to communicate with the server.
     * Note that this may change over time based on operations performed by the client or server (e.g., it may go from
     * <CODE>false</CODE> to <CODE>true</CODE> if the client uses the StartTLS extended operation).
     *
     * @return <CODE>true</CODE> if the client connection is currently using a secure mechanism to communicate with the
     *         server, or <CODE>false</CODE> if not.
     */
    @Override
    public boolean isSecure() {
        return false;
    }
    /**
     * Sends a response to the client based on the information in the provided operation.
     *
     * @param operation
     *            The operation for which to send the response.
     */
    @Override
    public void sendResponse(Operation operation) {
        // Since this is the final response for this operation, we can go
        // ahead and remove it from the "operations in progress" list. It
        // can't be canceled after this point, and this will avoid potential
        // race conditions in which the client immediately sends another
        // request with the same message ID as was used for this operation.
        if (keepStats) {
            long time;
            if (useNanoTime) {
                time = operation.getProcessingNanoTime();
            } else {
                time = operation.getProcessingTime();
            }
            this.statTracker.updateOperationMonitoringData(operation.getOperationType(), time);
        }
        // Avoid sending the response if one has already been sent. This may happen
        // if operation processing encounters a run-time exception after sending the
        // response: the worker thread exception handling code will attempt to send
        // an error result to the client indicating that a problem occurred.
        if (removeOperationInProgress(operation.getMessageID())) {
            final Response response = operationToResponse(operation);
            final FlowableEmitter<Response> out = getOut(operation);
            if (response != null) {
                out.onNext(response);
            }
            out.onComplete();
        }
    }
    /**
     * Retrieves an LDAPMessage containing a response generated from the provided operation.
     *
     * @param operation
     *            The operation to use to generate the response LDAPMessage.
     * @return An LDAPMessage containing a response generated from the provided operation.
     */
    private Response operationToResponse(Operation operation) {
        ResultCode resultCode = operation.getResultCode();
        if (resultCode == null) {
            // This must mean that the operation has either not yet completed
            // or that it completed without a result for some reason. In any
            // case, log a message and set the response to "operations error".
            logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_NO_RESULT_CODE, operation.getOperationType(),
                    operation.getConnectionID(), operation.getOperationID());
            resultCode = DirectoryServer.getServerErrorResultCode();
        }
        LocalizableMessageBuilder errorMessage = operation.getErrorMessage();
        String matchedDN = operation.getMatchedDN() != null ? operation.getMatchedDN().toString() : null;
        // Referrals are not allowed for LDAPv2 clients.
        List<String> referralURLs;
        if (ldapVersion == 2) {
            referralURLs = null;
            if (resultCode == ResultCode.REFERRAL) {
                resultCode = ResultCode.CONSTRAINT_VIOLATION;
                errorMessage.append(ERR_LDAPV2_REFERRAL_RESULT_CHANGED.get());
            }
            List<String> opReferrals = operation.getReferralURLs();
            if (opReferrals != null && !opReferrals.isEmpty()) {
                StringBuilder referralsStr = new StringBuilder();
                Iterator<String> iterator = opReferrals.iterator();
                referralsStr.append(iterator.next());
                while (iterator.hasNext()) {
                    referralsStr.append(", ");
                    referralsStr.append(iterator.next());
                }
                errorMessage.append(ERR_LDAPV2_REFERRALS_OMITTED.get(referralsStr));
            }
        } else {
            referralURLs = operation.getReferralURLs();
        }
        final Result result;
        switch (operation.getOperationType()) {
        case ADD:
            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        case BIND:
            result = Responses.newBindResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN)
                    .setServerSASLCredentials(((BindOperationBasis) operation).getServerSASLCredentials());
            break;
        case COMPARE:
            result = Responses.newCompareResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        case DELETE:
            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        case EXTENDED:
            // If this an LDAPv2 client, then we can't send this.
            if (ldapVersion == 2) {
                logger.error(ERR_LDAPV2_SKIPPING_EXTENDED_RESPONSE, getConnectionID(), operation.getOperationID(),
                        operation);
                return null;
            }
            ExtendedOperationBasis extOp = (ExtendedOperationBasis) operation;
            result = Responses.newGenericExtendedResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN).setOID(extOp.getResponseOID()).setValue(extOp.getResponseValue());
            break;
        case MODIFY:
            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        case MODIFY_DN:
            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        case SEARCH:
            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        default:
            // This must be a type of operation that doesn't have a response.
            // This shouldn't happen, so log a message and return.
            logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_INVALID_OP, operation.getOperationType(), getConnectionID(),
                    operation.getOperationID(), operation);
            return null;
        }
        if (referralURLs != null) {
            result.getReferralURIs().addAll(referralURLs);
        }
        // Controls are not allowed for LDAPv2 clients.
        if (ldapVersion != 2) {
            for (Control control : operation.getResponseControls()) {
                result.addControl(Converters.from(control));
            }
        }
        return result;
    }
    /**
     * Sends the provided search result entry to the client.
     *
     * @param searchOperation
     *            The search operation with which the entry is associated
     * @param searchEntry
     *            The search result entry to be sent to the client
     */
    @Override
    public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
        getEmitter(searchOperation).onNext(toResponse(searchEntry));
    }
    private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
        return getOut(searchOperation);
    }
    private Response toResponse(SearchResultEntry searchEntry) {
        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
    }
    private FlowableEmitter<Response> getOut(Operation operation) {
        return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
    }
    /**
     * Sends the provided search result reference to the client.
     *
     * @param searchOperation
     *            The search operation with which the reference is associated.
     * @param searchReference
     *            The search result reference to be sent to the client.
     * @return <CODE>true</CODE> if the client is able to accept referrals, or <CODE>false</CODE> if the client cannot
     *         handle referrals and no more attempts should be made to send them for the associated search operation.
     */
    @Override
    public boolean sendSearchReference(SearchOperation searchOperation, SearchResultReference searchReference) {
        // Make sure this is not an LDAPv2 client. If it is, then they can't
        // see referrals so we'll not send anything. Also, throw an
        // exception so that the core server will know not to try sending
        // any more referrals to this client for the rest of the operation.
        if (ldapVersion == 2) {
            logger.error(ERR_LDAPV2_SKIPPING_SEARCH_REFERENCE, getConnectionID(), searchOperation.getOperationID(),
                    searchReference);
            return false;
        }
        final FlowableEmitter<Response> out = getOut(searchOperation);
        out.onNext(Converters.from(searchReference));
        return true;
    }
    /**
     * Sends the provided intermediate response message to the client.
     *
     * @param intermediateResponse
     *            The intermediate response message to be sent.
     * @return <CODE>true</CODE> if processing on the associated operation should continue, or <CODE>false</CODE> if
     *         not.
     */
    @Override
    protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
        final Operation operation = intermediateResponse.getOperation();
        final FlowableEmitter<Response> out = getOut(operation);
        final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
                intermediateResponse.getValue());
        for (Control control : intermediateResponse.getControls()) {
            response.addControl(Converters.from(control));
        }
        out.onNext(response);
      }
      out.onComplete();
    }
  }
  /**
   * Retrieves an LDAPMessage containing a response generated from the
   * provided operation.
   *
   * @param operation
   *          The operation to use to generate the response LDAPMessage.
   * @return An LDAPMessage containing a response generated from the
   *         provided operation.
   */
  private Response operationToResponse(Operation operation)
  {
    ResultCode resultCode = operation.getResultCode();
    if (resultCode == null)
    {
      // This must mean that the operation has either not yet completed
      // or that it completed without a result for some reason. In any
      // case, log a message and set the response to "operations error".
      logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_NO_RESULT_CODE, operation.getOperationType(),
          operation.getConnectionID(), operation.getOperationID());
      resultCode = DirectoryServer.getServerErrorResultCode();
        // The only reason we shouldn't continue processing is if the
        // connection is closed.
        return connectionValid;
    }
    LocalizableMessageBuilder errorMessage = operation.getErrorMessage();
    String matchedDN = operation.getMatchedDN() != null ? operation.getMatchedDN().toString() : null;
    // Referrals are not allowed for LDAPv2 clients.
    List<String> referralURLs;
    if (ldapVersion == 2)
    {
      referralURLs = null;
      if (resultCode == ResultCode.REFERRAL)
      {
        resultCode = ResultCode.CONSTRAINT_VIOLATION;
        errorMessage.append(ERR_LDAPV2_REFERRAL_RESULT_CHANGED.get());
      }
      List<String> opReferrals = operation.getReferralURLs();
      if (opReferrals != null && !opReferrals.isEmpty())
      {
        StringBuilder referralsStr = new StringBuilder();
        Iterator<String> iterator = opReferrals.iterator();
        referralsStr.append(iterator.next());
        while (iterator.hasNext())
        {
          referralsStr.append(", ");
          referralsStr.append(iterator.next());
        }
        errorMessage.append(ERR_LDAPV2_REFERRALS_OMITTED.get(referralsStr));
      }
    }
    else
    {
      referralURLs = operation.getReferralURLs();
    }
    final Result result;
    switch (operation.getOperationType())
    {
    case ADD:
      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
      break;
    case BIND:
      result = Responses.newBindResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN)
                        .setServerSASLCredentials(((BindOperationBasis) operation).getServerSASLCredentials());
      break;
    case COMPARE:
      result = Responses.newCompareResult(resultCode)
                        .setDiagnosticMessage(errorMessage.toString())
                        .setMatchedDN(matchedDN);
      break;
    case DELETE:
      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
      break;
    case EXTENDED:
      // If this an LDAPv2 client, then we can't send this.
      if (ldapVersion == 2)
      {
        logger.error(ERR_LDAPV2_SKIPPING_EXTENDED_RESPONSE,
            getConnectionID(), operation.getOperationID(), operation);
        return null;
      }
      ExtendedOperationBasis extOp = (ExtendedOperationBasis) operation;
      result = Responses.newGenericExtendedResult(resultCode)
                        .setDiagnosticMessage(errorMessage.toString())
                        .setMatchedDN(matchedDN)
                        .setOID(extOp.getResponseOID()).setValue(extOp.getResponseValue());
      break;
    case MODIFY:
      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
      break;
    case MODIFY_DN:
      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
      break;
    case SEARCH:
      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
      break;
    default:
      // This must be a type of operation that doesn't have a response.
      // This shouldn't happen, so log a message and return.
      logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_INVALID_OP, operation.getOperationType(), getConnectionID(),
          operation.getOperationID(), operation);
      return null;
    }
    if (referralURLs != null)
    {
      result.getReferralURIs().addAll(referralURLs);
    }
    // Controls are not allowed for LDAPv2 clients.
    if (ldapVersion != 2)
    {
      for(Control control : operation.getResponseControls()) {
        result.addControl(Converters.from(control));
      }
    }
    return result;
  }
  /**
   * Sends the provided search result entry to the client.
   *
   * @param searchOperation
   *          The search operation with which the entry is associated
   * @param searchEntry
   *          The search result entry to be sent to the client
   */
  @Override
  public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry)
  {
    getEmitter(searchOperation).onNext(toResponse(searchEntry));
  }
  private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation)
  {
    return getOut(searchOperation);
  }
  private Response toResponse(SearchResultEntry searchEntry)
  {
    return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
  }
  private FlowableEmitter<Response> getOut(Operation operation)
  {
    return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
  }
  /**
   * Sends the provided search result reference to the client.
   *
   * @param searchOperation
   *          The search operation with which the reference is
   *          associated.
   * @param searchReference
   *          The search result reference to be sent to the client.
   * @return <CODE>true</CODE> if the client is able to accept
   *         referrals, or <CODE>false</CODE> if the client cannot
   *         handle referrals and no more attempts should be made to
   *         send them for the associated search operation.
   */
  @Override
  public boolean sendSearchReference(SearchOperation searchOperation, SearchResultReference searchReference)
  {
    // Make sure this is not an LDAPv2 client. If it is, then they can't
    // see referrals so we'll not send anything. Also, throw an
    // exception so that the core server will know not to try sending
    // any more referrals to this client for the rest of the operation.
    if (ldapVersion == 2)
    {
      logger.error(ERR_LDAPV2_SKIPPING_SEARCH_REFERENCE, getConnectionID(),
              searchOperation.getOperationID(), searchReference);
      return false;
    }
    final FlowableEmitter<Response> out = getOut(searchOperation);
    out.onNext(Converters.from(searchReference));
    return true;
  }
  /**
   * Sends the provided intermediate response message to the client.
   *
   * @param intermediateResponse
   *          The intermediate response message to be sent.
   * @return <CODE>true</CODE> if processing on the associated operation
   *         should continue, or <CODE>false</CODE> if not.
   */
  @Override
  protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse)
  {
    final Operation operation = intermediateResponse.getOperation();
    final FlowableEmitter<Response> out = getOut(operation);
    final Response response =
        Responses.newGenericIntermediateResponse(intermediateResponse.getOID(), intermediateResponse.getValue());
    for (Control control : intermediateResponse.getControls())
    {
      response.addControl(Converters.from(control));
    }
    out.onNext(response);
    // The only reason we shouldn't continue processing is if the
    // connection is closed.
    return connectionValid;
  }
  /**
   * Closes the connection to the client, optionally sending it a
   * message indicating the reason for the closure. Note that the
   * ability to send a notice of disconnection may not be available for
   * all protocols or under all circumstances.
   *
   * @param disconnectReason
   *          The disconnect reason that provides the generic cause for
   *          the disconnect.
   * @param sendNotification
   *          Indicates whether to try to provide notification to the
   *          client that the connection will be closed.
   * @param message
   *          The message to include in the disconnect notification
   *          response. It may be <CODE>null</CODE> if no message is to
   *          be sent.
   */
  @Override
  public void disconnect(DisconnectReason disconnectReason,
      boolean sendNotification, LocalizableMessage message)
  {
    // Set a flag indicating that the connection is being terminated so
    // that no new requests will be accepted. Also cancel all operations
    // in progress.
    synchronized (opsInProgressLock)
    {
      // If we are already in the middle of a disconnect, then don't
      // do anything.
      if (disconnectRequested)
      {
        return;
      }
      disconnectRequested = true;
    }
    if (keepStats)
    {
      statTracker.updateDisconnect();
    }
    if (connectionID >= 0)
    {
      DirectoryServer.connectionClosed(this);
    }
    // Indicate that this connection is no longer valid.
    connectionValid = false;
    final LocalizableMessage cancelMessage;
    if (message != null)
    {
      cancelMessage = new LocalizableMessageBuilder()
          .append(disconnectReason.getClosureMessage())
          .append(": ")
          .append(message)
          .toMessage();
    }
    else
    {
      cancelMessage = disconnectReason.getClosureMessage();
    }
    cancelAllOperations(new CancelRequest(true, cancelMessage));
    finalizeConnectionInternal();
    // See if we should send a notification to the client. If so, then
    // construct and send a notice of disconnection unsolicited
    // response. Note that we cannot send this notification to an LDAPv2 client.
    if (sendNotification && ldapVersion != 2)
    {
      try
      {
        LocalizableMessage errMsg = message != null ? message : INFO_LDAP_CLIENT_GENERIC_NOTICE_OF_DISCONNECTION.get();
        clientContext.disconnect(ResultCode.valueOf(toResultCode(disconnectReason)), errMsg.toString());
      }
      catch (Exception e)
      {
        // NYI -- Log a message indicating that we couldn't send the
        // notice of disconnection.
        logger.traceException(e);
      }
    }
    else
    {
        clientContext.disconnect();
    }
    // NYI -- Deregister the client connection from any server components that
    // might know about it.
    logDisconnect(this, disconnectReason, message);
    try
    {
      PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
      pluginManager.invokePostDisconnectPlugins(this, disconnectReason, message);
    }
    catch (Exception e)
    {
      logger.traceException(e);
    }
  }
  private int toResultCode(DisconnectReason disconnectReason)
  {
    switch (disconnectReason)
    {
    case PROTOCOL_ERROR:
      return LDAPResultCode.PROTOCOL_ERROR;
    case SERVER_SHUTDOWN:
      return LDAPResultCode.UNAVAILABLE;
    case SERVER_ERROR:
      return DirectoryServer.getServerErrorResultCode().intValue();
    case ADMIN_LIMIT_EXCEEDED:
    case IDLE_TIME_LIMIT_EXCEEDED:
    case MAX_REQUEST_SIZE_EXCEEDED:
    case IO_TIMEOUT:
      return LDAPResultCode.ADMIN_LIMIT_EXCEEDED;
    case CONNECTION_REJECTED:
      return LDAPResultCode.CONSTRAINT_VIOLATION;
    case INVALID_CREDENTIALS:
      return LDAPResultCode.INVALID_CREDENTIALS;
    default:
      return LDAPResultCode.OTHER;
    }
  }
  /**
   * Retrieves the set of operations in progress for this client
   * connection. This list must not be altered by any caller.
   *
   * @return The set of operations in progress for this client
   *         connection.
   */
  @Override
  public Collection<Operation> getOperationsInProgress()
  {
    return operationsInProgress.values();
  }
  /**
   * Retrieves the operation in progress with the specified message ID.
   *
   * @param messageID
   *          The message ID for the operation to retrieve.
   * @return The operation in progress with the specified message ID, or
   *         <CODE>null</CODE> if no such operation could be found.
   */
  @Override
  public Operation getOperationInProgress(int messageID)
  {
    return operationsInProgress.get(messageID);
  }
  /**
   * Adds the provided operation to the set of operations in progress
   * for this client connection.
   *
   * @param operation
   *          The operation to add to the set of operations in progress
   *          for this client connection.
   * @throws DirectoryException
   *           If the operation is not added for some reason (e.g., the
   *           client already has reached the maximum allowed concurrent
   *           requests).
   */
  private void addOperationInProgress(final QueueingStrategy queueingStrategy, Operation operation)
      throws DirectoryException
  {
    int messageID = operation.getMessageID();
    // We need to grab a lock to ensure that no one else can add
    // operations to the queue while we are performing some preliminary
    // checks.
    try
    {
      synchronized (opsInProgressLock)
      {
        // If we're already in the process of disconnecting the client,
        // then reject the operation.
        if (disconnectRequested)
        {
          LocalizableMessage message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
          throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
              message);
        }
        // Add the operation to the list of operations in progress for
        // this connection.
        Operation op = operationsInProgress.putIfAbsent(messageID, operation);
        // See if there is already an operation in progress with the
        // same message ID. If so, then we can't allow it.
        if (op != null)
        {
          LocalizableMessage message =
            WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
          throw new DirectoryException(ResultCode.PROTOCOL_ERROR,
              message);
        }
      }
      // Try to add the operation to the work queue,
      // or run it synchronously (typically for the administration
      // connector)
      queueingStrategy.enqueueRequest(operation);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      operationsInProgress.remove(messageID);
      lastCompletionTime.set(TimeThread.getTime());
      throw de;
    }
    catch (Exception e)
    {
      logger.traceException(e);
      LocalizableMessage message =
        WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
      throw new DirectoryException(DirectoryServer
          .getServerErrorResultCode(), message, e);
    }
  }
  /**
   * Removes the provided operation from the set of operations in
   * progress for this client connection. Note that this does not make
   * any attempt to cancel any processing that may already be in
   * progress for the operation.
   *
   * @param messageID
   *          The message ID of the operation to remove from the set of
   *          operations in progress.
   * @return <CODE>true</CODE> if the operation was found and removed
   *         from the set of operations in progress, or
   *         <CODE>false</CODE> if not.
   */
  @Override
  public boolean removeOperationInProgress(int messageID)
  {
    Operation operation = operationsInProgress.remove(messageID);
    if (operation == null)
    {
      return false;
    }
    if (operation.getOperationType() == OperationType.ABANDON
        && keepStats
        && operation.getResultCode() == ResultCode.CANCELLED)
    {
      statTracker.updateAbandonedOperation();
    }
    lastCompletionTime.set(TimeThread.getTime());
    return true;
  }
  /**
   * Attempts to cancel the specified operation.
   *
   * @param messageID
   *          The message ID of the operation to cancel.
   * @param cancelRequest
   *          An object providing additional information about how the
   *          cancel should be processed.
   * @return A cancel result that either indicates that the cancel was
   *         successful or provides a reason that it was not.
   */
  @Override
  public CancelResult cancelOperation(int messageID,
      CancelRequest cancelRequest)
  {
    Operation op = operationsInProgress.get(messageID);
    if (op != null)
    {
      return op.cancel(cancelRequest);
    }
    // See if the operation is in the list of persistent searches.
    for (PersistentSearch ps : getPersistentSearches())
    {
      if (ps.getMessageID() == messageID)
      {
        // We only need to find the first persistent search
        // associated with the provided message ID. The persistent search
        // will ensure that all other related persistent searches are cancelled.
        return ps.cancel();
      }
    }
    return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
  }
  /**
   * Attempts to cancel all operations in progress on this connection.
   *
   * @param cancelRequest
   *          An object providing additional information about how the
   *          cancel should be processed.
   */
  @Override
  public void cancelAllOperations(CancelRequest cancelRequest)
  {
    // Make sure that no one can add any new operations.
    synchronized (opsInProgressLock)
    {
      try
      {
        for (Operation o : operationsInProgress.values())
        {
          try
          {
            o.abort(cancelRequest);
            // TODO: Assume its cancelled?
            if (keepStats)
            {
              statTracker.updateAbandonedOperation();
    /**
     * Closes the connection to the client, optionally sending it a message indicating the reason for the closure. Note
     * that the ability to send a notice of disconnection may not be available for all protocols or under all
     * circumstances.
     *
     * @param disconnectReason
     *            The disconnect reason that provides the generic cause for the disconnect.
     * @param sendNotification
     *            Indicates whether to try to provide notification to the client that the connection will be closed.
     * @param message
     *            The message to include in the disconnect notification response. It may be <CODE>null</CODE> if no
     *            message is to be sent.
     */
    @Override
    public void disconnect(DisconnectReason disconnectReason, boolean sendNotification, LocalizableMessage message) {
        // Set a flag indicating that the connection is being terminated so
        // that no new requests will be accepted. Also cancel all operations
        // in progress.
        synchronized (opsInProgressLock) {
            // If we are already in the middle of a disconnect, then don't
            // do anything.
            if (disconnectRequested) {
                return;
            }
          }
          catch (Exception e)
          {
            disconnectRequested = true;
        }
        if (keepStats) {
            statTracker.updateDisconnect();
        }
        if (connectionID >= 0) {
            DirectoryServer.connectionClosed(this);
        }
        // Indicate that this connection is no longer valid.
        connectionValid = false;
        final LocalizableMessage cancelMessage;
        if (message != null) {
            cancelMessage = new LocalizableMessageBuilder().append(disconnectReason.getClosureMessage()).append(": ")
                    .append(message).toMessage();
        } else {
            cancelMessage = disconnectReason.getClosureMessage();
        }
        cancelAllOperations(new CancelRequest(true, cancelMessage));
        finalizeConnectionInternal();
        // See if we should send a notification to the client. If so, then
        // construct and send a notice of disconnection unsolicited
        // response. Note that we cannot send this notification to an LDAPv2 client.
        if (sendNotification && ldapVersion != 2) {
            try {
                LocalizableMessage errMsg = message != null ? message
                        : INFO_LDAP_CLIENT_GENERIC_NOTICE_OF_DISCONNECTION.get();
                clientContext.disconnect(ResultCode.valueOf(toResultCode(disconnectReason)), errMsg.toString());
            } catch (Exception e) {
                // NYI -- Log a message indicating that we couldn't send the
                // notice of disconnection.
                logger.traceException(e);
            }
        } else {
            clientContext.disconnect();
        }
        // NYI -- Deregister the client connection from any server components that
        // might know about it.
        logDisconnect(this, disconnectReason, message);
        try {
            PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
            pluginManager.invokePostDisconnectPlugins(this, disconnectReason, message);
        } catch (Exception e) {
            logger.traceException(e);
          }
        }
        if (!operationsInProgress.isEmpty()
            || !getPersistentSearches().isEmpty())
        {
          lastCompletionTime.set(TimeThread.getTime());
        }
        operationsInProgress.clear();
        for (PersistentSearch persistentSearch : getPersistentSearches())
        {
          persistentSearch.cancel();
        }
      }
      catch (Exception e)
      {
        logger.traceException(e);
      }
    }
  }
  /**
   * Attempts to cancel all operations in progress on this connection
   * except the operation with the specified message ID.
   *
   * @param cancelRequest
   *          An object providing additional information about how the
   *          cancel should be processed.
   * @param messageID
   *          The message ID of the operation that should not be
   *          canceled.
   */
  @Override
  public void cancelAllOperationsExcept(CancelRequest cancelRequest,
      int messageID)
  {
    // Make sure that no one can add any new operations.
    synchronized (opsInProgressLock)
    {
      try
      {
        for (int msgID : operationsInProgress.keySet())
        {
          if (msgID == messageID)
          {
            continue;
          }
          Operation o = operationsInProgress.get(msgID);
          if (o != null)
          {
            try
            {
              o.abort(cancelRequest);
              // TODO: Assume its cancelled?
              if (keepStats)
              {
                statTracker.updateAbandonedOperation();
              }
            }
            catch (Exception e)
            {
              logger.traceException(e);
            }
          }
          operationsInProgress.remove(msgID);
          lastCompletionTime.set(TimeThread.getTime());
    private int toResultCode(DisconnectReason disconnectReason) {
        switch (disconnectReason) {
        case PROTOCOL_ERROR:
            return LDAPResultCode.PROTOCOL_ERROR;
        case SERVER_SHUTDOWN:
            return LDAPResultCode.UNAVAILABLE;
        case SERVER_ERROR:
            return DirectoryServer.getServerErrorResultCode().intValue();
        case ADMIN_LIMIT_EXCEEDED:
        case IDLE_TIME_LIMIT_EXCEEDED:
        case MAX_REQUEST_SIZE_EXCEEDED:
        case IO_TIMEOUT:
            return LDAPResultCode.ADMIN_LIMIT_EXCEEDED;
        case CONNECTION_REJECTED:
            return LDAPResultCode.CONSTRAINT_VIOLATION;
        case INVALID_CREDENTIALS:
            return LDAPResultCode.INVALID_CREDENTIALS;
        default:
            return LDAPResultCode.OTHER;
        }
        for (PersistentSearch persistentSearch : getPersistentSearches())
        {
          if (persistentSearch.getMessageID() == messageID)
          {
            continue;
          }
          persistentSearch.cancel();
          lastCompletionTime.set(TimeThread.getTime());
        }
      }
      catch (Exception e)
      {
        logger.traceException(e);
      }
    }
  }
  @Override
  public Selector getWriteSelector()
  {
    throw new UnsupportedOperationException();
  }
    /**
     * Retrieves the set of operations in progress for this client connection. This list must not be altered by any
     * caller.
     *
     * @return The set of operations in progress for this client connection.
     */
    @Override
    public Collection<Operation> getOperationsInProgress() {
        return operationsInProgress.values();
    }
  @Override
  public long getMaxBlockedWriteTimeLimit()
  {
    return connectionHandler.getMaxBlockedWriteTimeLimit();
  }
    /**
     * Retrieves the operation in progress with the specified message ID.
     *
     * @param messageID
     *            The message ID for the operation to retrieve.
     * @return The operation in progress with the specified message ID, or <CODE>null</CODE> if no such operation could
     *         be found.
     */
    @Override
    public Operation getOperationInProgress(int messageID) {
        return operationsInProgress.get(messageID);
    }
  /**
   * Returns the total number of operations initiated on this
   * connection.
   *
   * @return the total number of operations on this connection
   */
  @Override
  public long getNumberOfOperations()
  {
    return operationsPerformed.get();
  }
    /**
     * Adds the provided operation to the set of operations in progress for this client connection.
     *
     * @param operation
     *            The operation to add to the set of operations in progress for this client connection.
     * @throws DirectoryException
     *             If the operation is not added for some reason (e.g., the client already has reached the maximum
     *             allowed concurrent requests).
     */
    private void addOperationInProgress(final QueueingStrategy queueingStrategy, Operation operation)
            throws DirectoryException {
        int messageID = operation.getMessageID();
  /**
   * Processes the provided LDAP message read from the client and takes
   * whatever action is appropriate. For most requests, this will
   * include placing the operation in the work queue. Certain requests
   * (in particular, abandons and unbinds) will be processed directly.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message to process.
   * @return <CODE>true</CODE> if the appropriate action was taken for
   *         the request, or <CODE>false</CODE> if there was a fatal
   *         error and the client has been disconnected as a result, or
   *         if the client unbound from the server.
   */
        // We need to grab a lock to ensure that no one else can add
        // operations to the queue while we are performing some preliminary
        // checks.
        try {
            synchronized (opsInProgressLock) {
                // If we're already in the process of disconnecting the client,
                // then reject the operation.
                if (disconnectRequested) {
                    LocalizableMessage message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
                    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
                }
                // Add the operation to the list of operations in progress for
                // this connection.
                Operation op = operationsInProgress.putIfAbsent(messageID, operation);
                // See if there is already an operation in progress with the
                // same message ID. If so, then we can't allow it.
                if (op != null) {
                    LocalizableMessage message = WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
                    throw new DirectoryException(ResultCode.PROTOCOL_ERROR, message);
                }
            }
            // Try to add the operation to the work queue,
            // or run it synchronously (typically for the administration
            // connector)
            queueingStrategy.enqueueRequest(operation);
        } catch (DirectoryException de) {
            logger.traceException(de);
            operationsInProgress.remove(messageID);
            lastCompletionTime.set(TimeThread.getTime());
            throw de;
        } catch (Exception e) {
            logger.traceException(e);
            LocalizableMessage message = WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
            throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message, e);
        }
    }
    /**
     * Removes the provided operation from the set of operations in progress for this client connection. Note that this
     * does not make any attempt to cancel any processing that may already be in progress for the operation.
     *
     * @param messageID
     *            The message ID of the operation to remove from the set of operations in progress.
     * @return <CODE>true</CODE> if the operation was found and removed from the set of operations in progress, or
     *         <CODE>false</CODE> if not.
     */
    @Override
    public boolean removeOperationInProgress(int messageID) {
        Operation operation = operationsInProgress.remove(messageID);
        if (operation == null) {
            return false;
        }
        if (operation.getOperationType() == OperationType.ABANDON && keepStats
                && operation.getResultCode() == ResultCode.CANCELLED) {
            statTracker.updateAbandonedOperation();
        }
        lastCompletionTime.set(TimeThread.getTime());
        return true;
    }
    /**
     * Attempts to cancel the specified operation.
     *
     * @param messageID
     *            The message ID of the operation to cancel.
     * @param cancelRequest
     *            An object providing additional information about how the cancel should be processed.
     * @return A cancel result that either indicates that the cancel was successful or provides a reason that it was
     *         not.
     */
    @Override
    public CancelResult cancelOperation(int messageID, CancelRequest cancelRequest) {
        Operation op = operationsInProgress.get(messageID);
        if (op != null) {
            return op.cancel(cancelRequest);
        }
        // See if the operation is in the list of persistent searches.
        for (PersistentSearch ps : getPersistentSearches()) {
            if (ps.getMessageID() == messageID) {
                // We only need to find the first persistent search
                // associated with the provided message ID. The persistent search
                // will ensure that all other related persistent searches are cancelled.
                return ps.cancel();
            }
        }
        return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
    }
    /**
     * Attempts to cancel all operations in progress on this connection.
     *
     * @param cancelRequest
     *            An object providing additional information about how the cancel should be processed.
     */
    @Override
    public void cancelAllOperations(CancelRequest cancelRequest) {
        // Make sure that no one can add any new operations.
        synchronized (opsInProgressLock) {
            try {
                for (Operation o : operationsInProgress.values()) {
                    try {
                        o.abort(cancelRequest);
                        // TODO: Assume its cancelled?
                        if (keepStats) {
                            statTracker.updateAbandonedOperation();
                        }
                    } catch (Exception e) {
                        logger.traceException(e);
                    }
                }
                if (!operationsInProgress.isEmpty() || !getPersistentSearches().isEmpty()) {
                    lastCompletionTime.set(TimeThread.getTime());
                }
                operationsInProgress.clear();
                for (PersistentSearch persistentSearch : getPersistentSearches()) {
                    persistentSearch.cancel();
                }
            } catch (Exception e) {
                logger.traceException(e);
            }
        }
    }
    /**
     * Attempts to cancel all operations in progress on this connection except the operation with the specified message
     * ID.
     *
     * @param cancelRequest
     *            An object providing additional information about how the cancel should be processed.
     * @param messageID
     *            The message ID of the operation that should not be canceled.
     */
    @Override
    public void cancelAllOperationsExcept(CancelRequest cancelRequest, int messageID) {
        // Make sure that no one can add any new operations.
        synchronized (opsInProgressLock) {
            try {
                for (int msgID : operationsInProgress.keySet()) {
                    if (msgID == messageID) {
                        continue;
                    }
                    Operation o = operationsInProgress.get(msgID);
                    if (o != null) {
                        try {
                            o.abort(cancelRequest);
                            // TODO: Assume its cancelled?
                            if (keepStats) {
                                statTracker.updateAbandonedOperation();
                            }
                        } catch (Exception e) {
                            logger.traceException(e);
                        }
                    }
                    operationsInProgress.remove(msgID);
                    lastCompletionTime.set(TimeThread.getTime());
                }
                for (PersistentSearch persistentSearch : getPersistentSearches()) {
                    if (persistentSearch.getMessageID() == messageID) {
                        continue;
                    }
                    persistentSearch.cancel();
                    lastCompletionTime.set(TimeThread.getTime());
                }
            } catch (Exception e) {
                logger.traceException(e);
            }
        }
    }
    @Override
    public Selector getWriteSelector() {
        throw new UnsupportedOperationException();
    }
    @Override
    public long getMaxBlockedWriteTimeLimit() {
        return connectionHandler.getMaxBlockedWriteTimeLimit();
    }
    /**
     * Returns the total number of operations initiated on this connection.
     *
     * @return the total number of operations on this connection
     */
    @Override
    public long getNumberOfOperations() {
        return operationsPerformed.get();
    }
    /**
     * Processes the provided LDAP message read from the client and takes whatever action is appropriate. For most
     * requests, this will include placing the operation in the work queue. Certain requests (in particular, abandons
     * and unbinds) will be processed directly.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message to process.
     * @return <CODE>true</CODE> if the appropriate action was taken for the request, or <CODE>false</CODE> if there was
     *         a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
     */
    @Override
    public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
        return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
@@ -1124,855 +957,693 @@
        }, BackpressureMode.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
    }
  private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final FlowableEmitter<Response> out)
  {
    if (keepStats)
    {
      statTracker.updateMessageRead(message);
    private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final FlowableEmitter<Response> out) {
        if (keepStats) {
            statTracker.updateMessageRead(message);
        }
        operationsPerformed.getAndIncrement();
        List<Control> opControls = message.getControls();
        // FIXME -- See if there is a bind in progress. If so, then deny
        // most kinds of operations.
        // Figure out what type of operation we're dealing with based on the
        // LDAP message. Abandon and unbind requests will be processed here.
        // All other types of requests will be encapsulated into operations
        // and append into the work queue to be picked up by a worker
        // thread. Any other kinds of LDAP messages (e.g., response
        // messages) are illegal and will result in the connection being
        // terminated.
        try {
            if (bindInProgress.get()) {
                throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_BIND_IN_PROGRESS.get());
            } else if (startTLSInProgress.get()) {
                throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_STARTTLS_IN_PROGRESS.get());
            } else if (saslBindInProgress.get() && message.getProtocolOpType() != OP_TYPE_BIND_REQUEST) {
                throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_SASLBIND_IN_PROGRESS.get());
            }
            boolean result;
            switch (message.getProtocolOpType()) {
            case OP_TYPE_ABANDON_REQUEST:
                return processAbandonRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_ADD_REQUEST:
                return processAddRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_BIND_REQUEST:
                boolean isSaslBind =
                    message.getBindRequestProtocolOp().getAuthenticationType() == AuthenticationType.SASL;
                bindInProgress.set(true);
                if (isSaslBind) {
                    saslBindInProgress.set(true);
                }
                result = processBindRequest(queueingStrategy, message, opControls, out);
                if (!result) {
                    bindInProgress.set(false);
                    if (isSaslBind) {
                        saslBindInProgress.set(false);
                    }
                }
                return result;
            case OP_TYPE_COMPARE_REQUEST:
                return processCompareRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_DELETE_REQUEST:
                return processDeleteRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_EXTENDED_REQUEST:
                boolean isStartTlsRequest = OID_START_TLS_REQUEST.equals(message.getExtendedRequestProtocolOp()
                        .getOID());
                if (isStartTlsRequest) {
                    startTLSInProgress.set(true);
                }
                result = processExtendedRequest(queueingStrategy, message, opControls, out);
                if (!result && isStartTlsRequest) {
                    startTLSInProgress.set(false);
                }
                return result;
            case OP_TYPE_MODIFY_REQUEST:
                return processModifyRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_MODIFY_DN_REQUEST:
                return processModifyDNRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_SEARCH_REQUEST:
                return processSearchRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_UNBIND_REQUEST:
                return processUnbindRequest(message, opControls);
            default:
                LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_INVALID_REQUEST_TYPE.get(
                        message.getProtocolOpName(), message.getMessageID());
                disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
                return false;
            }
        } catch (Exception e) {
            logger.traceException(e);
            LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_PROCESSING_FAILURE.get(message.getProtocolOpName(),
                    message.getMessageID(), e);
            disconnect(DisconnectReason.SERVER_ERROR, true, msg);
            return false;
        }
    }
    operationsPerformed.getAndIncrement();
    List<Control> opControls = message.getControls();
    /**
     * Processes the provided LDAP message as an abandon request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the abandon request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processAbandonRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            disconnectControlsNotAllowed();
            return false;
        }
    // FIXME -- See if there is a bind in progress. If so, then deny
    // most kinds of operations.
        // Create the abandon operation and add it into the work queue.
        AbandonRequestProtocolOp protocolOp = message.getAbandonRequestProtocolOp();
        AbandonOperationBasis abandonOp = new AbandonOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getIDToAbandon());
        abandonOp.setAttachment(REACTIVE_OUT, out);
    // Figure out what type of operation we're dealing with based on the
    // LDAP message. Abandon and unbind requests will be processed here.
    // All other types of requests will be encapsulated into operations
    // and append into the work queue to be picked up by a worker
    // thread. Any other kinds of LDAP messages (e.g., response
    // messages) are illegal and will result in the connection being
    // terminated.
    try
    {
      if (bindInProgress.get())
      {
        throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_BIND_IN_PROGRESS.get());
      }
      else if (startTLSInProgress.get())
      {
        throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_STARTTLS_IN_PROGRESS.get());
      }
      else if (saslBindInProgress.get() && message.getProtocolOpType() != OP_TYPE_BIND_REQUEST)
      {
        throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_SASLBIND_IN_PROGRESS.get());
      }
        try {
            addOperationInProgress(queueingStrategy, abandonOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
      boolean result;
      switch (message.getProtocolOpType())
      {
      case OP_TYPE_ABANDON_REQUEST:
        return processAbandonRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_ADD_REQUEST:
        return processAddRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_BIND_REQUEST:
        boolean isSaslBind = message.getBindRequestProtocolOp().getAuthenticationType() == AuthenticationType.SASL;
        bindInProgress.set(true);
        if (isSaslBind)
        {
          saslBindInProgress.set(true);
            // Don't send an error response since abandon operations
            // don't have a response.
        }
        result = processBindRequest(queueingStrategy, message, opControls, out);
        if(!result)
        {
          bindInProgress.set(false);
          if (isSaslBind)
          {
            saslBindInProgress.set(false);
          }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as an add request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the add request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processAddRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        return result;
      case OP_TYPE_COMPARE_REQUEST:
        return processCompareRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_DELETE_REQUEST:
        return processDeleteRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_EXTENDED_REQUEST:
        boolean isStartTlsRequest = OID_START_TLS_REQUEST.equals(message.getExtendedRequestProtocolOp().getOID());
        if (isStartTlsRequest)
        {
          startTLSInProgress.set(true);
        // Create the add operation and add it into the work queue.
        AddRequestProtocolOp protocolOp = message.getAddRequestProtocolOp();
        AddOperationBasis addOp = new AddOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getAttributes());
        addOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, addOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            for (String referral : de.getReferralURLs()) {
                result.addReferralURI(referral);
            }
            out.onNext(result);
            out.onComplete();
        }
        result = processExtendedRequest(queueingStrategy, message, opControls, out);
        if (!result && isStartTlsRequest)
        {
          startTLSInProgress.set(false);
        return connectionValid;
    }
    private void disconnectControlsNotAllowed() {
        disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
    }
    /**
     * Processes the provided LDAP message as a bind request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the bind request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processBindRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        BindRequestProtocolOp protocolOp = message.getBindRequestProtocolOp();
        // See if this is an LDAPv2 bind request, and if so whether that
        // should be allowed.
        String versionString;
        switch (ldapVersion = protocolOp.getProtocolVersion()) {
        case 2:
            versionString = "2";
            if (!connectionHandler.allowLDAPv2()) {
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                        ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
                out.onComplete();
                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
                return false;
            }
            if (!controls.isEmpty()) {
                // LDAPv2 clients aren't allowed to send controls.
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                        ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                out.onComplete();
                disconnectControlsNotAllowed();
                return false;
            }
            break;
        case 3:
            versionString = "3";
            break;
        default:
            // Unsupported protocol version. RFC4511 states that we MUST send
            // a protocol error back to the client.
            out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion).toString()));
            out.onComplete();
            disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion));
            return false;
        }
        return result;
      case OP_TYPE_MODIFY_REQUEST:
        return processModifyRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_MODIFY_DN_REQUEST:
        return processModifyDNRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_SEARCH_REQUEST:
        return processSearchRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_UNBIND_REQUEST:
        return processUnbindRequest(message, opControls);
      default:
        LocalizableMessage msg =
            ERR_LDAP_DISCONNECT_DUE_TO_INVALID_REQUEST_TYPE.get(message
                .getProtocolOpName(), message.getMessageID());
        disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
        ByteString bindDN = protocolOp.getDN();
        BindOperationBasis bindOp;
        switch (protocolOp.getAuthenticationType()) {
        case SIMPLE:
            bindOp = new BindOperationBasis(this, nextOperationID.getAndIncrement(), message.getMessageID(), controls,
                    versionString, bindDN, protocolOp.getSimplePassword());
            break;
        case SASL:
            bindOp = new BindOperationBasis(this, nextOperationID.getAndIncrement(), message.getMessageID(), controls,
                    versionString, bindDN, protocolOp.getSASLMechanism(), protocolOp.getSASLCredentials());
            break;
        default:
            // This is an invalid authentication type, and therefore a
            // protocol error. As per RFC 2251, a protocol error in a bind
            // request must result in terminating the connection.
            LocalizableMessage msg = ERR_LDAP_INVALID_BIND_AUTH_TYPE.get(message.getMessageID(),
                    protocolOp.getAuthenticationType());
            disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
            return false;
        }
        // Add the operation into the work queue.
        bindOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, bindOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newBindResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            for (String referral : de.getReferralURLs()) {
                result.addReferralURI(referral);
            }
            out.onNext(result);
            out.onComplete();
            // If it was a protocol error, then terminate the connection.
            if (de.getResultCode() == ResultCode.PROTOCOL_ERROR) {
                LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_BIND_PROTOCOL_ERROR.get(message.getMessageID(),
                        de.getMessageObject());
                disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
            }
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as a compare request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the compare request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processCompareRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        CompareRequestProtocolOp protocolOp = message.getCompareRequestProtocolOp();
        CompareOperationBasis compareOp = new CompareOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getAttributeType(),
                protocolOp.getAssertionValue());
        // Add the operation into the work queue.
        compareOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, compareOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final CompareResult result = Responses.newCompareResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as a delete request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the delete request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processDeleteRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        DeleteRequestProtocolOp protocolOp = message.getDeleteRequestProtocolOp();
        DeleteOperationBasis deleteOp = new DeleteOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getDN());
        // Add the operation into the work queue.
        deleteOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, deleteOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as an extended request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the extended request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processExtendedRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        // See if this is an LDAPv2 client. If it is, then they should not
        // be issuing extended requests. We can't send a response that we
        // can be sure they can understand, so we have no choice but to
        // close the connection.
        if (ldapVersion == 2) {
            // LDAPv2 clients aren't allowed to send controls.
            LocalizableMessage msg = ERR_LDAPV2_EXTENDED_REQUEST_NOT_ALLOWED.get(getConnectionID(),
                    message.getMessageID());
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(msg.toString()));
            out.onComplete();
            logger.error(msg);
            disconnect(DisconnectReason.PROTOCOL_ERROR, false, msg);
            return false;
        }
        // FIXME -- Do we need to handle certain types of request here?
        // -- StartTLS requests
        // -- Cancel requests
        ExtendedRequestProtocolOp protocolOp = message.getExtendedRequestProtocolOp();
        ExtendedOperationBasis extendedOp = new ExtendedOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getOID(), protocolOp.getValue());
        // Add the operation into the work queue.
        extendedOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, extendedOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as a modify request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the modify request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processModifyRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        ModifyRequestProtocolOp protocolOp = message.getModifyRequestProtocolOp();
        ModifyOperationBasis modifyOp = new ModifyOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getModifications());
        // Add the operation into the work queue.
        modifyOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, modifyOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as a modify DN request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the modify DN request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processModifyDNRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        ModifyDNRequestProtocolOp protocolOp = message.getModifyDNRequestProtocolOp();
        ModifyDNOperationBasis modifyDNOp = new ModifyDNOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getEntryDN(), protocolOp.getNewRDN(),
                protocolOp.deleteOldRDN(), protocolOp.getNewSuperior());
        // Add the operation into the work queue.
        modifyDNOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, modifyDNOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            for (Control control : modifyDNOp.getResponseControls()) {
                result.addControl(Converters.from(control));
            }
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as a search request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the search request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processSearchRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        SearchRequestProtocolOp protocolOp = message.getSearchRequestProtocolOp();
        SearchOperationBasis searchOp = new SearchOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getBaseDN(), protocolOp.getScope(),
                protocolOp.getDereferencePolicy(), protocolOp.getSizeLimit(), protocolOp.getTimeLimit(),
                protocolOp.getTypesOnly(), protocolOp.getFilter(), protocolOp.getAttributes());
        // Add the operation into the work queue.
        searchOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, searchOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode());
            if (de.getMessage() != null) {
                result.setDiagnosticMessage(de.getMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (searchOp.getResponseControls() != null) {
                for (Control control : searchOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as an unbind request.
     *
     * @param message
     *            The LDAP message containing the unbind request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processUnbindRequest(final LDAPMessage message, final List<Control> controls) {
        UnbindOperationBasis unbindOp = new UnbindOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls);
        unbindOp.run();
        // The client connection will never be valid after an unbind.
        return false;
      }
    }
    catch (Exception e)
    {
      logger.traceException(e);
      LocalizableMessage msg =
          ERR_LDAP_DISCONNECT_DUE_TO_PROCESSING_FAILURE.get(message
              .getProtocolOpName(), message.getMessageID(), e);
      disconnect(DisconnectReason.SERVER_ERROR, true, msg);
      return false;
    }
  }
  /**
   * Processes the provided LDAP message as an abandon request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the abandon request to
   *          process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processAbandonRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      disconnectControlsNotAllowed();
      return false;
    }
    // Create the abandon operation and add it into the work queue.
    AbandonRequestProtocolOp protocolOp =
        message.getAbandonRequestProtocolOp();
    AbandonOperationBasis abandonOp =
        new AbandonOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getIDToAbandon());
    abandonOp.setAttachment(REACTIVE_OUT, out);
    @Override
    public String getMonitorSummary() {
        StringBuilder buffer = new StringBuilder();
        buffer.append("connID=\"");
        buffer.append(connectionID);
        buffer.append("\" connectTime=\"");
        buffer.append(getConnectTimeString());
        buffer.append("\" source=\"");
        buffer.append(clientAddress);
        buffer.append(":");
        buffer.append(clientPort);
        buffer.append("\" destination=\"");
        buffer.append(serverAddress);
        buffer.append(":");
        buffer.append(connectionHandler.getListeners().iterator().next().getPort());
        buffer.append("\" ldapVersion=\"");
        buffer.append(ldapVersion);
        buffer.append("\" authDN=\"");
    try
    {
      addOperationInProgress(queueingStrategy, abandonOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      // Don't send an error response since abandon operations
      // don't have a response.
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as an add request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the add request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processAddRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage(ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    // Create the add operation and add it into the work queue.
    AddRequestProtocolOp protocolOp = message.getAddRequestProtocolOp();
    AddOperationBasis addOp =
        new AddOperationBasis(this, nextOperationID.getAndIncrement(),
            message.getMessageID(), controls, protocolOp.getDN(),
            protocolOp.getAttributes());
    addOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, addOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result = Responses.newResult(de.getResultCode())
          .setDiagnosticMessage(de.getLocalizedMessage())
          .setMatchedDN(de.getMatchedDN().toString());
      for(String referral : de.getReferralURLs()) {
        result.addReferralURI(referral);
      }
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  private void disconnectControlsNotAllowed()
  {
    disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
  }
  /**
   * Processes the provided LDAP message as a bind request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the bind request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processBindRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    BindRequestProtocolOp protocolOp =
        message.getBindRequestProtocolOp();
    // See if this is an LDAPv2 bind request, and if so whether that
    // should be allowed.
    String versionString;
    switch (ldapVersion = protocolOp.getProtocolVersion())
    {
    case 2:
      versionString = "2";
      if (!connectionHandler.allowLDAPv2())
      {
        out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
                            .setDiagnosticMessage(ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
        out.onComplete();
        disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
        return false;
      }
      if (!controls.isEmpty())
      {
        // LDAPv2 clients aren't allowed to send controls.
        out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
            .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
        out.onComplete();
        disconnectControlsNotAllowed();
        return false;
      }
      break;
    case 3:
      versionString = "3";
      break;
    default:
      // Unsupported protocol version. RFC4511 states that we MUST send
      // a protocol error back to the client.
      out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage(ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion).toString()));
      out.onComplete();
      disconnect(DisconnectReason.PROTOCOL_ERROR, false,
          ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion));
      return false;
    }
    ByteString bindDN = protocolOp.getDN();
    BindOperationBasis bindOp;
    switch (protocolOp.getAuthenticationType())
    {
    case SIMPLE:
      bindOp =
          new BindOperationBasis(this, nextOperationID
              .getAndIncrement(), message.getMessageID(), controls,
              versionString, bindDN, protocolOp.getSimplePassword());
      break;
    case SASL:
      bindOp =
          new BindOperationBasis(this, nextOperationID
              .getAndIncrement(), message.getMessageID(), controls,
              versionString, bindDN, protocolOp.getSASLMechanism(),
              protocolOp.getSASLCredentials());
      break;
    default:
      // This is an invalid authentication type, and therefore a
      // protocol error. As per RFC 2251, a protocol error in a bind
      // request must result in terminating the connection.
      LocalizableMessage msg =
          ERR_LDAP_INVALID_BIND_AUTH_TYPE.get(message.getMessageID(),
              protocolOp.getAuthenticationType());
      disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
      return false;
    }
    // Add the operation into the work queue.
    bindOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, bindOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result = Responses.newBindResult(de.getResultCode())
                                                      .setDiagnosticMessage(de.getLocalizedMessage())
                                                      .setMatchedDN(de.getMatchedDN().toString());
      for(String referral : de.getReferralURLs())
      {
        result.addReferralURI(referral);
      }
      out.onNext(result);
      out.onComplete();
      // If it was a protocol error, then terminate the connection.
      if (de.getResultCode() == ResultCode.PROTOCOL_ERROR)
      {
        LocalizableMessage msg =
            ERR_LDAP_DISCONNECT_DUE_TO_BIND_PROTOCOL_ERROR.get(message
                .getMessageID(), de.getMessageObject());
        disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
      }
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as a compare request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the compare request to
   *          process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processCompareRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    CompareRequestProtocolOp protocolOp =
        message.getCompareRequestProtocolOp();
    CompareOperationBasis compareOp =
        new CompareOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getDN(), protocolOp.getAttributeType(),
            protocolOp.getAssertionValue());
    // Add the operation into the work queue.
    compareOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, compareOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final CompareResult result = Responses.newCompareResult(de.getResultCode())
                                            .setDiagnosticMessage(de.getLocalizedMessage())
                                            .setMatchedDN(de.getMatchedDN().toString());
      result.getReferralURIs().addAll(de.getReferralURLs());
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as a delete request.
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the delete request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processDeleteRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage( ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    DeleteRequestProtocolOp protocolOp =
        message.getDeleteRequestProtocolOp();
    DeleteOperationBasis deleteOp =
        new DeleteOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getDN());
    // Add the operation into the work queue.
    deleteOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, deleteOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result = Responses.newResult(de.getResultCode())
                                     .setDiagnosticMessage(de.getLocalizedMessage())
                                     .setMatchedDN(de.getMatchedDN().toString());
      result.getReferralURIs().addAll(de.getReferralURLs());
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as an extended request.
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the extended request to
   *          process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processExtendedRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
      final List<Control> controls, final FlowableEmitter<Response> out)
  {
    // See if this is an LDAPv2 client. If it is, then they should not
    // be issuing extended requests. We can't send a response that we
    // can be sure they can understand, so we have no choice but to
    // close the connection.
    if (ldapVersion == 2)
    {
      // LDAPv2 clients aren't allowed to send controls.
      LocalizableMessage msg = ERR_LDAPV2_EXTENDED_REQUEST_NOT_ALLOWED.get(getConnectionID(), message.getMessageID());
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage(msg.toString()));
      out.onComplete();
      logger.error(msg);
      disconnect(DisconnectReason.PROTOCOL_ERROR, false, msg);
      return false;
    }
    // FIXME -- Do we need to handle certain types of request here?
    // -- StartTLS requests
    // -- Cancel requests
    ExtendedRequestProtocolOp protocolOp =
        message.getExtendedRequestProtocolOp();
    ExtendedOperationBasis extendedOp =
        new ExtendedOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getOID(), protocolOp.getValue());
    // Add the operation into the work queue.
    extendedOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, extendedOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result = Responses.newResult(de.getResultCode())
                                     .setDiagnosticMessage(de.getMessage())
                                     .setMatchedDN(de.getMatchedDN().toString());
      result.getReferralURIs().addAll(de.getReferralURLs());
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as a modify request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the modify request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processModifyRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    ModifyRequestProtocolOp protocolOp =
        message.getModifyRequestProtocolOp();
    ModifyOperationBasis modifyOp =
        new ModifyOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getDN(), protocolOp.getModifications());
    // Add the operation into the work queue.
    modifyOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, modifyOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result =
          Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()).setMatchedDN(de.getMatchedDN()
              .toString());
      result.getReferralURIs().addAll(de.getReferralURLs());
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as a modify DN request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the modify DN request to
   *          process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processModifyDNRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    ModifyDNRequestProtocolOp protocolOp =
        message.getModifyDNRequestProtocolOp();
    ModifyDNOperationBasis modifyDNOp =
        new ModifyDNOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getEntryDN(), protocolOp.getNewRDN(), protocolOp
                .deleteOldRDN(), protocolOp.getNewSuperior());
    // Add the operation into the work queue.
    modifyDNOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, modifyDNOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result =
          Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()).setMatchedDN(de.getMatchedDN()
              .toString());
      result.getReferralURIs().addAll(de.getReferralURLs());
      for(Control control : modifyDNOp.getResponseControls()) {
        result.addControl(Converters.from(control));
      }
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as a search request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the search request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processSearchRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    SearchRequestProtocolOp protocolOp =
        message.getSearchRequestProtocolOp();
    SearchOperationBasis searchOp =
        new SearchOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getBaseDN(), protocolOp.getScope(), protocolOp
                .getDereferencePolicy(), protocolOp.getSizeLimit(),
            protocolOp.getTimeLimit(), protocolOp.getTypesOnly(),
            protocolOp.getFilter(), protocolOp.getAttributes());
    // Add the operation into the work queue.
    searchOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, searchOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result = Responses.newResult(de.getResultCode());
      if (de.getMessage() != null)
      {
        result.setDiagnosticMessage(de.getMessage());
      }
      if (de.getMatchedDN() != null)
      {
        result.setMatchedDN(de.getMatchedDN().toString());
      }
      if (de.getReferralURLs() != null)
      {
        result.getReferralURIs().addAll(de.getReferralURLs());
      }
      if (searchOp.getResponseControls() != null)
      {
        for (Control control : searchOp.getResponseControls())
        {
          result.addControl(Converters.from(control));
        DN authDN = getAuthenticationInfo().getAuthenticationDN();
        if (authDN != null) {
            buffer.append(authDN);
        }
      }
      out.onNext(result);
      out.onComplete();
        buffer.append("\" security=\"");
        buffer.append("none");
        buffer.append("\" opsInProgress=\"");
        buffer.append(operationsInProgress.size());
        buffer.append("\"");
        int countPSearch = getPersistentSearches().size();
        if (countPSearch > 0) {
            buffer.append(" persistentSearches=\"");
            buffer.append(countPSearch);
            buffer.append("\"");
        }
        return buffer.toString();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as an unbind request.
   *
   * @param message
   *          The LDAP message containing the unbind request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processUnbindRequest(final LDAPMessage message, final List<Control> controls)
  {
    UnbindOperationBasis unbindOp =
        new UnbindOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls);
    unbindOp.run();
    // The client connection will never be valid after an unbind.
    return false;
  }
  @Override
  public String getMonitorSummary()
  {
    StringBuilder buffer = new StringBuilder();
    buffer.append("connID=\"");
    buffer.append(connectionID);
    buffer.append("\" connectTime=\"");
    buffer.append(getConnectTimeString());
    buffer.append("\" source=\"");
    buffer.append(clientAddress);
    buffer.append(":");
    buffer.append(clientPort);
    buffer.append("\" destination=\"");
    buffer.append(serverAddress);
    buffer.append(":");
    buffer.append(connectionHandler.getListeners().iterator().next().getPort());
    buffer.append("\" ldapVersion=\"");
    buffer.append(ldapVersion);
    buffer.append("\" authDN=\"");
    DN authDN = getAuthenticationInfo().getAuthenticationDN();
    if (authDN != null)
    {
      buffer.append(authDN);
    /**
     * Appends a string representation of this client connection to the provided buffer.
     *
     * @param buffer
     *            The buffer to which the information should be appended.
     */
    @Override
    public void toString(StringBuilder buffer) {
        buffer.append("LDAP client connection from ");
        buffer.append(clientAddress);
        buffer.append(":");
        buffer.append(clientPort);
        buffer.append(" to ");
        buffer.append(serverAddress);
        buffer.append(":");
        buffer.append(serverPort);
    }
    buffer.append("\" security=\"");
    buffer.append("none");
    buffer.append("\" opsInProgress=\"");
    buffer.append(operationsInProgress.size());
    buffer.append("\"");
    int countPSearch = getPersistentSearches().size();
    if (countPSearch > 0)
    {
      buffer.append(" persistentSearches=\"");
      buffer.append(countPSearch);
      buffer.append("\"");
    @Override
    public boolean prepareTLS(LocalizableMessageBuilder unavailableReason) {
        throw new UnsupportedOperationException();
    }
    return buffer.toString();
  }
  /**
   * Appends a string representation of this client connection to the
   * provided buffer.
   *
   * @param buffer
   *          The buffer to which the information should be appended.
   */
  @Override
  public void toString(StringBuilder buffer)
  {
    buffer.append("LDAP client connection from ");
    buffer.append(clientAddress);
    buffer.append(":");
    buffer.append(clientPort);
    buffer.append(" to ");
    buffer.append(serverAddress);
    buffer.append(":");
    buffer.append(serverPort);
  }
  @Override
  public boolean prepareTLS(LocalizableMessageBuilder unavailableReason)
  {
    throw new UnsupportedOperationException();
  }
  /**
   * Retrieves the length of time in milliseconds that this client
   * connection has been idle. <BR>
   * <BR>
   * Note that the default implementation will always return zero.
   * Subclasses associated with connection handlers should override this
   * method if they wish to provided idle time limit functionality.
   *
   * @return The length of time in milliseconds that this client
   *         connection has been idle.
   */
  @Override
  public long getIdleTime()
  {
    if (operationsInProgress.isEmpty()
        && getPersistentSearches().isEmpty())
    {
      return TimeThread.getTime() - lastCompletionTime.get();
    /**
     * Retrieves the length of time in milliseconds that this client connection has been idle. <BR>
     * <BR>
     * Note that the default implementation will always return zero. Subclasses associated with connection handlers
     * should override this method if they wish to provided idle time limit functionality.
     *
     * @return The length of time in milliseconds that this client connection has been idle.
     */
    @Override
    public long getIdleTime() {
        if (operationsInProgress.isEmpty() && getPersistentSearches().isEmpty()) {
            return TimeThread.getTime() - lastCompletionTime.get();
        } else {
            // There's at least one operation in progress, so it's not idle.
            return 0L;
        }
    }
    else
    {
      // There's at least one operation in progress, so it's not idle.
      return 0L;
    /**
     * Return the certificate chain array associated with a connection.
     *
     * @return The array of certificates associated with a connection.
     */
    public Certificate[] getClientCertificateChain() {
        return new Certificate[0];
    }
  }
  /**
   * Return the certificate chain array associated with a connection.
   *
   * @return The array of certificates associated with a connection.
   */
  public Certificate[] getClientCertificateChain()
  {
    return new Certificate[0];
  }
  @Override
  public int getSSF()
  {
    return 0;
  }
    @Override
    public int getSSF() {
        return 0;
    }
}