mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
25.07.2016 464f6dd52a1eeeef0f7b38d4dc1840501818a36f
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

Some fixes thanks to your comments :-)
4 files deleted
6 files modified
6347 ■■■■■ changed files
opendj-grizzly/pom.xml 4 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java 1 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java 25 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java 4 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java 271 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 3273 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java 1742 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/ProxyToHandler.java 168 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Route.java 61 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/core/LdapEndpointConfigManager.java 798 ●●●●● patch | view | raw | blame | history
opendj-grizzly/pom.xml
@@ -69,10 +69,6 @@
            <artifactId>forgerock-build-tools</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.forgerock.http</groupId>
          <artifactId>chf-http-core</artifactId>
        </dependency>
    </dependencies>
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -118,7 +118,6 @@
            default:
                rawDn = null;
                protocolVersion = -1;
            }
            return LdapMessages.newRawMessage(messageType, messageId, protocolVersion, rawDn,
                    rawDn != null ? decodeOptions.getSchemaResolver().resolveSchema(rawDn) : null, reader);
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java
@@ -25,6 +25,7 @@
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.memory.Buffers;
import org.glassfish.grizzly.memory.BuffersBuffer;
import org.glassfish.grizzly.memory.HeapMemoryManager;
final class SaslFilter extends BaseFilter {
@@ -82,17 +83,25 @@
    private Buffer wrap(final FilterChainContext ctx, final Buffer buffer) throws SaslException {
        final SaslServer server = SaslUtils.getSaslServer(ctx.getConnection());
        final Buffer contentBuffer;
        if (buffer.hasArray()) {
            return Buffers.wrap(ctx.getMemoryManager(),
            contentBuffer = Buffers.wrap(ctx.getMemoryManager(),
                    server.wrap(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()));
        }
        final Buffer heapBuffer = toHeapBuffer(buffer, buffer.remaining());
        try {
            return Buffers.wrap(ctx.getMemoryManager(), server.wrap(heapBuffer.array(),
                    heapBuffer.arrayOffset() + heapBuffer.position(), heapBuffer.remaining()));
        } else {
            final Buffer heapBuffer = toHeapBuffer(buffer, buffer.remaining());
            try {
                contentBuffer = Buffers.wrap(
                        ctx.getMemoryManager(),
                        server.wrap(heapBuffer.array(), heapBuffer.arrayOffset() + heapBuffer.position(),
                                heapBuffer.remaining()));
        } finally {
            heapBuffer.dispose();
            } finally {
                heapBuffer.dispose();
            }
        }
        final Buffer headerBuffer = ctx.getMemoryManager().allocate(4);
        headerBuffer.putInt(contentBuffer.limit());
        headerBuffer.flip();
        return BuffersBuffer.create(ctx.getMemoryManager(), headerBuffer, contentBuffer);
    }
}
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
@@ -139,7 +139,7 @@
        // Use custom search request.
        SearchRequest request = Requests.newSearchRequest(
            "uid=user.0,ou=people,o=test", SearchScope.BASE_OBJECT, "(objectclass=*)", "cn");
//
        InetSocketAddress serverAddress = getServerSocketAddress();
        factories[0][0] = new LDAPConnectionFactory(serverAddress.getHostName(),
                                                    serverAddress.getPort(),
@@ -246,7 +246,7 @@
     *
     * @throws Exception
     */
    @Test(dataProvider = "connectionFactories") //, timeOut = TEST_TIMEOUT_MS)
    @Test(dataProvider = "connectionFactories", timeOut = TEST_TIMEOUT_MS)
    public void testBlockingPromiseNoHandler(ConnectionFactory factory) throws Exception {
        final Promise<? extends Connection, LdapException> promise = factory.getConnectionAsync();
        final Connection con = promise.get();
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
File was deleted
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -105,1015 +105,848 @@
import io.reactivex.FlowableOnSubscribe;
/**
 * This class defines an LDAP client connection, which is a type of
 * client connection that will be accepted by an instance of the LDAP
 * connection handler and have its requests decoded by an LDAP request
 * handler.
 * This class defines an LDAP client connection, which is a type of client connection that will be accepted by an
 * instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
 */
public final class LDAPClientConnection2 extends ClientConnection implements
    TLSCapableConnection, ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>>
{
  private static final String REACTIVE_OUT = "reactive.out";
public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
        ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
    private static final String REACTIVE_OUT = "reactive.out";
/** The tracer object for the debug logger. */
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
    /** The tracer object for the debug logger. */
    private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  /** The time that the last operation was completed. */
  private final AtomicLong lastCompletionTime;
  /** The next operation ID that should be used for this connection. */
  private final AtomicLong nextOperationID;
    /** The time that the last operation was completed. */
    private final AtomicLong lastCompletionTime;
    /** The next operation ID that should be used for this connection. */
    private final AtomicLong nextOperationID;
  /**
   * Indicates whether the Directory Server believes this connection to be valid
   * and available for communication.
   */
  private volatile boolean connectionValid;
    /**
     * Indicates whether the Directory Server believes this connection to be valid and available for communication.
     */
    private volatile boolean connectionValid;
  /**
   * Indicates whether this connection is about to be closed. This will be used
   * to prevent accepting new requests while a disconnect is in progress.
   */
  private boolean disconnectRequested;
    /**
     * Indicates whether this connection is about to be closed. This will be used to prevent accepting new requests
     * while a disconnect is in progress.
     */
    private boolean disconnectRequested;
  /**
   * Indicates whether the connection should keep statistics regarding the
   * operations that it is performing.
   */
  private final boolean keepStats;
    /**
     * Indicates whether the connection should keep statistics regarding the operations that it is performing.
     */
    private final boolean keepStats;
  /** The set of all operations currently in progress on this connection. */
  private final ConcurrentHashMap<Integer, Operation> operationsInProgress;
    /** The set of all operations currently in progress on this connection. */
    private final ConcurrentHashMap<Integer, Operation> operationsInProgress;
  /**
   * The number of operations performed on this connection. Used to compare with
   * the resource limits of the network group.
   */
  private final AtomicLong operationsPerformed;
    /**
     * The number of operations performed on this connection. Used to compare with the resource limits of the network
     * group.
     */
    private final AtomicLong operationsPerformed;
  /** The port on the client from which this connection originated. */
  private final int clientPort;
  /** The LDAP version that the client is using to communicate with the server. */
  private int ldapVersion;
  /** The port on the server to which this client has connected. */
  private final int serverPort;
    /** The port on the client from which this connection originated. */
    private final int clientPort;
    /** The LDAP version that the client is using to communicate with the server. */
    private int ldapVersion;
    /** The port on the server to which this client has connected. */
    private final int serverPort;
  /** The reference to the connection handler that accepted this connection. */
  private final LDAPConnectionHandler2 connectionHandler;
  /** The statistics tracker associated with this client connection. */
  private final LDAPStatistics statTracker;
  private final boolean useNanoTime;
    /** The reference to the connection handler that accepted this connection. */
    private final LDAPConnectionHandler2 connectionHandler;
    /** The statistics tracker associated with this client connection. */
    private final LDAPStatistics statTracker;
    private final boolean useNanoTime;
  /** The connection ID assigned to this connection. */
  private final long connectionID;
    /** The connection ID assigned to this connection. */
    private final long connectionID;
  /** The lock used to provide threadsafe access to the set of operations in progress. */
  private final Object opsInProgressLock;
    /** The lock used to provide threadsafe access to the set of operations in progress. */
    private final Object opsInProgressLock;
  /** The socket channel with which this client connection is associated. */
  private final LDAPClientContext clientContext;
    /** The socket channel with which this client connection is associated. */
    private final LDAPClientContext clientContext;
  /** The string representation of the address of the client. */
  private final String clientAddress;
  /** The name of the protocol that the client is using to communicate with the server. */
  private final String protocol;
  /** The string representation of the address of the server to which the client has connected. */
  private final String serverAddress;
    /** The string representation of the address of the client. */
    private final String clientAddress;
    /** The name of the protocol that the client is using to communicate with the server. */
    private final String protocol;
    /** The string representation of the address of the server to which the client has connected. */
    private final String serverAddress;
  /**
   * Creates a new LDAP client connection with the provided information.
   *
   * @param connectionHandler
   *          The connection handler that accepted this connection.
   * @param clientContext
   *          The socket channel that may be used to communicate with
   *          the client.
   * @param  protocol String representing the protocol (LDAP or LDAP+SSL).
   * @throws LdapException
   * @throws DirectoryException If SSL initialisation fails.
   */
  LDAPClientConnection2(LDAPConnectionHandler2 connectionHandler, LDAPClientContext clientContext, String protocol,
          boolean keepStats)
  {
    this.connectionHandler = connectionHandler;
    this.clientContext = clientContext;
    opsInProgressLock = new Object();
    ldapVersion = 3;
    lastCompletionTime = new AtomicLong(TimeThread.getTime());
    nextOperationID = new AtomicLong(0);
    connectionValid = true;
    disconnectRequested = false;
    operationsInProgress = new ConcurrentHashMap<>();
    operationsPerformed = new AtomicLong(0);
    this.keepStats = keepStats;
    this.protocol = protocol;
    /**
     * Creates a new LDAP client connection with the provided information.
     *
     * @param connectionHandler
     *            The connection handler that accepted this connection.
     * @param clientContext
     *            The socket channel that may be used to communicate with the client.
     * @param protocol
     *            String representing the protocol (LDAP or LDAP+SSL).
     * @throws LdapException
     * @throws DirectoryException
     *             If SSL initialisation fails.
     */
    LDAPClientConnection2(LDAPConnectionHandler2 connectionHandler, LDAPClientContext clientContext, String protocol,
            boolean keepStats) {
        this.connectionHandler = connectionHandler;
        this.clientContext = clientContext;
        opsInProgressLock = new Object();
        ldapVersion = 3;
        lastCompletionTime = new AtomicLong(TimeThread.getTime());
        nextOperationID = new AtomicLong(0);
        connectionValid = true;
        disconnectRequested = false;
        operationsInProgress = new ConcurrentHashMap<>();
        operationsPerformed = new AtomicLong(0);
        this.keepStats = keepStats;
        this.protocol = protocol;
    clientAddress = clientContext.getPeerAddress().getAddress().getHostAddress();
    clientPort = clientContext.getPeerAddress().getPort();
    serverAddress = clientContext.getLocalAddress().getAddress().getHostAddress();
    serverPort = clientContext.getLocalAddress().getPort();
        clientAddress = clientContext.getPeerAddress().getAddress().getHostAddress();
        clientPort = clientContext.getPeerAddress().getPort();
        serverAddress = clientContext.getLocalAddress().getAddress().getHostAddress();
        serverPort = clientContext.getLocalAddress().getPort();
    statTracker = this.connectionHandler.getStatTracker();
    if (keepStats)
    {
      statTracker.updateConnect();
      this.useNanoTime = DirectoryServer.getUseNanoTime();
    }
    else
    {
      this.useNanoTime = false;
    }
    connectionID = DirectoryServer.newConnectionAccepted(this);
    clientContext.onDisconnect(new DisconnectListener()
    {
      @Override
      public void exceptionOccurred(LDAPClientContext context, Throwable error)
      {
        if (error instanceof LocalizableException)
        {
          disconnect(DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
        }
        else
        {
          disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
        }
      }
      @Override
      public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage)
      {
        disconnect(DisconnectReason.SERVER_ERROR, false, null);
      }
      @Override
      public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest)
      {
        disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
      }
    });
  }
  /**
   * Retrieves the connection ID assigned to this connection.
   *
   * @return The connection ID assigned to this connection.
   */
  @Override
  public long getConnectionID()
  {
    return connectionID;
  }
  /**
   * Retrieves the connection handler that accepted this client
   * connection.
   *
   * @return The connection handler that accepted this client
   *         connection.
   */
  @Override
  public ConnectionHandler<?> getConnectionHandler()
  {
    return connectionHandler;
  }
  /**
   * Retrieves the socket channel that can be used to communicate with
   * the client.
   *
   * @return The socket channel that can be used to communicate with the
   *         client.
   */
  @Override
  public SocketChannel getSocketChannel()
  {
    throw new UnsupportedOperationException();
  }
  /**
   * Retrieves the protocol that the client is using to communicate with
   * the Directory Server.
   *
   * @return The protocol that the client is using to communicate with
   *         the Directory Server.
   */
  @Override
  public String getProtocol()
  {
    return protocol;
  }
  /**
   * Retrieves a string representation of the address of the client.
   *
   * @return A string representation of the address of the client.
   */
  @Override
  public String getClientAddress()
  {
    return clientAddress;
  }
  /**
   * Retrieves the port number for this connection on the client system.
   *
   * @return The port number for this connection on the client system.
   */
  @Override
  public int getClientPort()
  {
    return clientPort;
  }
  /**
   * Retrieves a string representation of the address on the server to
   * which the client connected.
   *
   * @return A string representation of the address on the server to
   *         which the client connected.
   */
  @Override
  public String getServerAddress()
  {
    return serverAddress;
  }
  /**
   * Retrieves the port number for this connection on the server system.
   *
   * @return The port number for this connection on the server system.
   */
  @Override
  public int getServerPort()
  {
    return serverPort;
  }
  /**
   * Retrieves the <CODE>java.net.InetAddress</CODE> associated with the
   * remote client system.
   *
   * @return The <CODE>java.net.InetAddress</CODE> associated with the
   *         remote client system. It may be <CODE>null</CODE> if the
   *         client is not connected over an IP-based connection.
   */
  @Override
  public InetAddress getRemoteAddress()
  {
    return clientContext.getPeerAddress().getAddress();
  }
  /**
   * Retrieves the <CODE>java.net.InetAddress</CODE> for the Directory
   * Server system to which the client has established the connection.
   *
   * @return The <CODE>java.net.InetAddress</CODE> for the Directory
   *         Server system to which the client has established the
   *         connection. It may be <CODE>null</CODE> if the client is
   *         not connected over an IP-based connection.
   */
  @Override
  public InetAddress getLocalAddress()
  {
    return clientContext.getLocalAddress().getAddress();
  }
  @Override
  public boolean isConnectionValid()
  {
    return this.connectionValid;
  }
  /**
   * Indicates whether this client connection is currently using a
   * secure mechanism to communicate with the server. Note that this may
   * change over time based on operations performed by the client or
   * server (e.g., it may go from <CODE>false</CODE> to
   * <CODE>true</CODE> if the client uses the StartTLS extended
   * operation).
   *
   * @return <CODE>true</CODE> if the client connection is currently
   *         using a secure mechanism to communicate with the server, or
   *         <CODE>false</CODE> if not.
   */
  @Override
  public boolean isSecure()
  {
    return false;
  }
  /**
   * Sends a response to the client based on the information in the
   * provided operation.
   *
   * @param operation
   *          The operation for which to send the response.
   */
  @Override
  public void sendResponse(Operation operation)
  {
    // Since this is the final response for this operation, we can go
    // ahead and remove it from the "operations in progress" list. It
    // can't be canceled after this point, and this will avoid potential
    // race conditions in which the client immediately sends another
    // request with the same message ID as was used for this operation.
    if (keepStats) {
        long time;
        if (useNanoTime) {
            time = operation.getProcessingNanoTime();
        statTracker = this.connectionHandler.getStatTracker();
        if (keepStats) {
            statTracker.updateConnect();
            this.useNanoTime = DirectoryServer.getUseNanoTime();
        } else {
            time = operation.getProcessingTime();
            this.useNanoTime = false;
        }
        this.statTracker.updateOperationMonitoringData(
                operation.getOperationType(),
                time);
        connectionID = DirectoryServer.newConnectionAccepted(this);
        clientContext.onDisconnect(new DisconnectListener() {
            @Override
            public void exceptionOccurred(LDAPClientContext context, Throwable error) {
                if (error instanceof LocalizableException) {
                    disconnect(
                            DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
                } else {
                    disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
                }
            }
            @Override
            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
                    String diagnosticMessage) {
                disconnect(DisconnectReason.SERVER_ERROR, false, null);
            }
            @Override
            public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
                disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
            }
        });
    }
    // Avoid sending the response if one has already been sent. This may happen
    // if operation processing encounters a run-time exception after sending the
    // response: the worker thread exception handling code will attempt to send
    // an error result to the client indicating that a problem occurred.
    if (removeOperationInProgress(operation.getMessageID()))
    {
      final Response response = operationToResponse(operation);
      final FlowableEmitter<Response> out = getOut(operation);
      if (response != null)
      {
    /**
     * Retrieves the connection ID assigned to this connection.
     *
     * @return The connection ID assigned to this connection.
     */
    @Override
    public long getConnectionID() {
        return connectionID;
    }
    /**
     * Retrieves the connection handler that accepted this client connection.
     *
     * @return The connection handler that accepted this client connection.
     */
    @Override
    public ConnectionHandler<?> getConnectionHandler() {
        return connectionHandler;
    }
    /**
     * Retrieves the socket channel that can be used to communicate with the client.
     *
     * @return The socket channel that can be used to communicate with the client.
     */
    @Override
    public SocketChannel getSocketChannel() {
        throw new UnsupportedOperationException();
    }
    /**
     * Retrieves the protocol that the client is using to communicate with the Directory Server.
     *
     * @return The protocol that the client is using to communicate with the Directory Server.
     */
    @Override
    public String getProtocol() {
        return protocol;
    }
    /**
     * Retrieves a string representation of the address of the client.
     *
     * @return A string representation of the address of the client.
     */
    @Override
    public String getClientAddress() {
        return clientAddress;
    }
    /**
     * Retrieves the port number for this connection on the client system.
     *
     * @return The port number for this connection on the client system.
     */
    @Override
    public int getClientPort() {
        return clientPort;
    }
    /**
     * Retrieves a string representation of the address on the server to which the client connected.
     *
     * @return A string representation of the address on the server to which the client connected.
     */
    @Override
    public String getServerAddress() {
        return serverAddress;
    }
    /**
     * Retrieves the port number for this connection on the server system.
     *
     * @return The port number for this connection on the server system.
     */
    @Override
    public int getServerPort() {
        return serverPort;
    }
    /**
     * Retrieves the <CODE>java.net.InetAddress</CODE> associated with the remote client system.
     *
     * @return The <CODE>java.net.InetAddress</CODE> associated with the remote client system. It may be
     *         <CODE>null</CODE> if the client is not connected over an IP-based connection.
     */
    @Override
    public InetAddress getRemoteAddress() {
        return clientContext.getPeerAddress().getAddress();
    }
    /**
     * Retrieves the <CODE>java.net.InetAddress</CODE> for the Directory Server system to which the client has
     * established the connection.
     *
     * @return The <CODE>java.net.InetAddress</CODE> for the Directory Server system to which the client has established
     *         the connection. It may be <CODE>null</CODE> if the client is not connected over an IP-based connection.
     */
    @Override
    public InetAddress getLocalAddress() {
        return clientContext.getLocalAddress().getAddress();
    }
    @Override
    public boolean isConnectionValid() {
        return this.connectionValid;
    }
    /**
     * Indicates whether this client connection is currently using a secure mechanism to communicate with the server.
     * Note that this may change over time based on operations performed by the client or server (e.g., it may go from
     * <CODE>false</CODE> to <CODE>true</CODE> if the client uses the StartTLS extended operation).
     *
     * @return <CODE>true</CODE> if the client connection is currently using a secure mechanism to communicate with the
     *         server, or <CODE>false</CODE> if not.
     */
    @Override
    public boolean isSecure() {
        return false;
    }
    /**
     * Sends a response to the client based on the information in the provided operation.
     *
     * @param operation
     *            The operation for which to send the response.
     */
    @Override
    public void sendResponse(Operation operation) {
        // Since this is the final response for this operation, we can go
        // ahead and remove it from the "operations in progress" list. It
        // can't be canceled after this point, and this will avoid potential
        // race conditions in which the client immediately sends another
        // request with the same message ID as was used for this operation.
        if (keepStats) {
            long time;
            if (useNanoTime) {
                time = operation.getProcessingNanoTime();
            } else {
                time = operation.getProcessingTime();
            }
            this.statTracker.updateOperationMonitoringData(operation.getOperationType(), time);
        }
        // Avoid sending the response if one has already been sent. This may happen
        // if operation processing encounters a run-time exception after sending the
        // response: the worker thread exception handling code will attempt to send
        // an error result to the client indicating that a problem occurred.
        if (removeOperationInProgress(operation.getMessageID())) {
            final Response response = operationToResponse(operation);
            final FlowableEmitter<Response> out = getOut(operation);
            if (response != null) {
                out.onNext(response);
            }
            out.onComplete();
        }
    }
    /**
     * Retrieves an LDAPMessage containing a response generated from the provided operation.
     *
     * @param operation
     *            The operation to use to generate the response LDAPMessage.
     * @return An LDAPMessage containing a response generated from the provided operation.
     */
    private Response operationToResponse(Operation operation) {
        ResultCode resultCode = operation.getResultCode();
        if (resultCode == null) {
            // This must mean that the operation has either not yet completed
            // or that it completed without a result for some reason. In any
            // case, log a message and set the response to "operations error".
            logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_NO_RESULT_CODE, operation.getOperationType(),
                    operation.getConnectionID(), operation.getOperationID());
            resultCode = DirectoryServer.getServerErrorResultCode();
        }
        LocalizableMessageBuilder errorMessage = operation.getErrorMessage();
        String matchedDN = operation.getMatchedDN() != null ? operation.getMatchedDN().toString() : null;
        // Referrals are not allowed for LDAPv2 clients.
        List<String> referralURLs;
        if (ldapVersion == 2) {
            referralURLs = null;
            if (resultCode == ResultCode.REFERRAL) {
                resultCode = ResultCode.CONSTRAINT_VIOLATION;
                errorMessage.append(ERR_LDAPV2_REFERRAL_RESULT_CHANGED.get());
            }
            List<String> opReferrals = operation.getReferralURLs();
            if (opReferrals != null && !opReferrals.isEmpty()) {
                StringBuilder referralsStr = new StringBuilder();
                Iterator<String> iterator = opReferrals.iterator();
                referralsStr.append(iterator.next());
                while (iterator.hasNext()) {
                    referralsStr.append(", ");
                    referralsStr.append(iterator.next());
                }
                errorMessage.append(ERR_LDAPV2_REFERRALS_OMITTED.get(referralsStr));
            }
        } else {
            referralURLs = operation.getReferralURLs();
        }
        final Result result;
        switch (operation.getOperationType()) {
        case ADD:
            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        case BIND:
            result = Responses.newBindResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN)
                    .setServerSASLCredentials(((BindOperationBasis) operation).getServerSASLCredentials());
            break;
        case COMPARE:
            result = Responses.newCompareResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        case DELETE:
            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        case EXTENDED:
            // If this an LDAPv2 client, then we can't send this.
            if (ldapVersion == 2) {
                logger.error(ERR_LDAPV2_SKIPPING_EXTENDED_RESPONSE, getConnectionID(), operation.getOperationID(),
                        operation);
                return null;
            }
            ExtendedOperationBasis extOp = (ExtendedOperationBasis) operation;
            result = Responses.newGenericExtendedResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN).setOID(extOp.getResponseOID()).setValue(extOp.getResponseValue());
            break;
        case MODIFY:
            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        case MODIFY_DN:
            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        case SEARCH:
            result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString())
                    .setMatchedDN(matchedDN);
            break;
        default:
            // This must be a type of operation that doesn't have a response.
            // This shouldn't happen, so log a message and return.
            logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_INVALID_OP, operation.getOperationType(), getConnectionID(),
                    operation.getOperationID(), operation);
            return null;
        }
        if (referralURLs != null) {
            result.getReferralURIs().addAll(referralURLs);
        }
        // Controls are not allowed for LDAPv2 clients.
        if (ldapVersion != 2) {
            for (Control control : operation.getResponseControls()) {
                result.addControl(Converters.from(control));
            }
        }
        return result;
    }
    /**
     * Sends the provided search result entry to the client.
     *
     * @param searchOperation
     *            The search operation with which the entry is associated
     * @param searchEntry
     *            The search result entry to be sent to the client
     */
    @Override
    public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
        getEmitter(searchOperation).onNext(toResponse(searchEntry));
    }
    private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
        return getOut(searchOperation);
    }
    private Response toResponse(SearchResultEntry searchEntry) {
        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
    }
    private FlowableEmitter<Response> getOut(Operation operation) {
        return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
    }
    /**
     * Sends the provided search result reference to the client.
     *
     * @param searchOperation
     *            The search operation with which the reference is associated.
     * @param searchReference
     *            The search result reference to be sent to the client.
     * @return <CODE>true</CODE> if the client is able to accept referrals, or <CODE>false</CODE> if the client cannot
     *         handle referrals and no more attempts should be made to send them for the associated search operation.
     */
    @Override
    public boolean sendSearchReference(SearchOperation searchOperation, SearchResultReference searchReference) {
        // Make sure this is not an LDAPv2 client. If it is, then they can't
        // see referrals so we'll not send anything. Also, throw an
        // exception so that the core server will know not to try sending
        // any more referrals to this client for the rest of the operation.
        if (ldapVersion == 2) {
            logger.error(ERR_LDAPV2_SKIPPING_SEARCH_REFERENCE, getConnectionID(), searchOperation.getOperationID(),
                    searchReference);
            return false;
        }
        final FlowableEmitter<Response> out = getOut(searchOperation);
        out.onNext(Converters.from(searchReference));
        return true;
    }
    /**
     * Sends the provided intermediate response message to the client.
     *
     * @param intermediateResponse
     *            The intermediate response message to be sent.
     * @return <CODE>true</CODE> if processing on the associated operation should continue, or <CODE>false</CODE> if
     *         not.
     */
    @Override
    protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
        final Operation operation = intermediateResponse.getOperation();
        final FlowableEmitter<Response> out = getOut(operation);
        final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
                intermediateResponse.getValue());
        for (Control control : intermediateResponse.getControls()) {
            response.addControl(Converters.from(control));
        }
        out.onNext(response);
      }
      out.onComplete();
    }
  }
  /**
   * Retrieves an LDAPMessage containing a response generated from the
   * provided operation.
   *
   * @param operation
   *          The operation to use to generate the response LDAPMessage.
   * @return An LDAPMessage containing a response generated from the
   *         provided operation.
   */
  private Response operationToResponse(Operation operation)
  {
    ResultCode resultCode = operation.getResultCode();
    if (resultCode == null)
    {
      // This must mean that the operation has either not yet completed
      // or that it completed without a result for some reason. In any
      // case, log a message and set the response to "operations error".
      logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_NO_RESULT_CODE, operation.getOperationType(),
          operation.getConnectionID(), operation.getOperationID());
      resultCode = DirectoryServer.getServerErrorResultCode();
        // The only reason we shouldn't continue processing is if the
        // connection is closed.
        return connectionValid;
    }
    LocalizableMessageBuilder errorMessage = operation.getErrorMessage();
    String matchedDN = operation.getMatchedDN() != null ? operation.getMatchedDN().toString() : null;
    // Referrals are not allowed for LDAPv2 clients.
    List<String> referralURLs;
    if (ldapVersion == 2)
    {
      referralURLs = null;
      if (resultCode == ResultCode.REFERRAL)
      {
        resultCode = ResultCode.CONSTRAINT_VIOLATION;
        errorMessage.append(ERR_LDAPV2_REFERRAL_RESULT_CHANGED.get());
      }
      List<String> opReferrals = operation.getReferralURLs();
      if (opReferrals != null && !opReferrals.isEmpty())
      {
        StringBuilder referralsStr = new StringBuilder();
        Iterator<String> iterator = opReferrals.iterator();
        referralsStr.append(iterator.next());
        while (iterator.hasNext())
        {
          referralsStr.append(", ");
          referralsStr.append(iterator.next());
        }
        errorMessage.append(ERR_LDAPV2_REFERRALS_OMITTED.get(referralsStr));
      }
    }
    else
    {
      referralURLs = operation.getReferralURLs();
    }
    final Result result;
    switch (operation.getOperationType())
    {
    case ADD:
      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
      break;
    case BIND:
      result = Responses.newBindResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN)
                        .setServerSASLCredentials(((BindOperationBasis) operation).getServerSASLCredentials());
      break;
    case COMPARE:
      result = Responses.newCompareResult(resultCode)
                        .setDiagnosticMessage(errorMessage.toString())
                        .setMatchedDN(matchedDN);
      break;
    case DELETE:
      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
      break;
    case EXTENDED:
      // If this an LDAPv2 client, then we can't send this.
      if (ldapVersion == 2)
      {
        logger.error(ERR_LDAPV2_SKIPPING_EXTENDED_RESPONSE,
            getConnectionID(), operation.getOperationID(), operation);
        return null;
      }
      ExtendedOperationBasis extOp = (ExtendedOperationBasis) operation;
      result = Responses.newGenericExtendedResult(resultCode)
                        .setDiagnosticMessage(errorMessage.toString())
                        .setMatchedDN(matchedDN)
                        .setOID(extOp.getResponseOID()).setValue(extOp.getResponseValue());
      break;
    case MODIFY:
      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
      break;
    case MODIFY_DN:
      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
      break;
    case SEARCH:
      result = Responses.newResult(resultCode).setDiagnosticMessage(errorMessage.toString()).setMatchedDN(matchedDN);
      break;
    default:
      // This must be a type of operation that doesn't have a response.
      // This shouldn't happen, so log a message and return.
      logger.error(ERR_LDAP_CLIENT_SEND_RESPONSE_INVALID_OP, operation.getOperationType(), getConnectionID(),
          operation.getOperationID(), operation);
      return null;
    }
    if (referralURLs != null)
    {
      result.getReferralURIs().addAll(referralURLs);
    }
    // Controls are not allowed for LDAPv2 clients.
    if (ldapVersion != 2)
    {
      for(Control control : operation.getResponseControls()) {
        result.addControl(Converters.from(control));
      }
    }
    return result;
  }
  /**
   * Sends the provided search result entry to the client.
   *
   * @param searchOperation
   *          The search operation with which the entry is associated
   * @param searchEntry
   *          The search result entry to be sent to the client
   */
  @Override
  public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry)
  {
    getEmitter(searchOperation).onNext(toResponse(searchEntry));
  }
  private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation)
  {
    return getOut(searchOperation);
  }
  private Response toResponse(SearchResultEntry searchEntry)
  {
    return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
  }
  private FlowableEmitter<Response> getOut(Operation operation)
  {
    return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
  }
  /**
   * Sends the provided search result reference to the client.
   *
   * @param searchOperation
   *          The search operation with which the reference is
   *          associated.
   * @param searchReference
   *          The search result reference to be sent to the client.
   * @return <CODE>true</CODE> if the client is able to accept
   *         referrals, or <CODE>false</CODE> if the client cannot
   *         handle referrals and no more attempts should be made to
   *         send them for the associated search operation.
   */
  @Override
  public boolean sendSearchReference(SearchOperation searchOperation, SearchResultReference searchReference)
  {
    // Make sure this is not an LDAPv2 client. If it is, then they can't
    // see referrals so we'll not send anything. Also, throw an
    // exception so that the core server will know not to try sending
    // any more referrals to this client for the rest of the operation.
    if (ldapVersion == 2)
    {
      logger.error(ERR_LDAPV2_SKIPPING_SEARCH_REFERENCE, getConnectionID(),
              searchOperation.getOperationID(), searchReference);
      return false;
    }
    final FlowableEmitter<Response> out = getOut(searchOperation);
    out.onNext(Converters.from(searchReference));
    return true;
  }
  /**
   * Sends the provided intermediate response message to the client.
   *
   * @param intermediateResponse
   *          The intermediate response message to be sent.
   * @return <CODE>true</CODE> if processing on the associated operation
   *         should continue, or <CODE>false</CODE> if not.
   */
  @Override
  protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse)
  {
    final Operation operation = intermediateResponse.getOperation();
    final FlowableEmitter<Response> out = getOut(operation);
    final Response response =
        Responses.newGenericIntermediateResponse(intermediateResponse.getOID(), intermediateResponse.getValue());
    for (Control control : intermediateResponse.getControls())
    {
      response.addControl(Converters.from(control));
    }
    out.onNext(response);
    // The only reason we shouldn't continue processing is if the
    // connection is closed.
    return connectionValid;
  }
  /**
   * Closes the connection to the client, optionally sending it a
   * message indicating the reason for the closure. Note that the
   * ability to send a notice of disconnection may not be available for
   * all protocols or under all circumstances.
   *
   * @param disconnectReason
   *          The disconnect reason that provides the generic cause for
   *          the disconnect.
   * @param sendNotification
   *          Indicates whether to try to provide notification to the
   *          client that the connection will be closed.
   * @param message
   *          The message to include in the disconnect notification
   *          response. It may be <CODE>null</CODE> if no message is to
   *          be sent.
   */
  @Override
  public void disconnect(DisconnectReason disconnectReason,
      boolean sendNotification, LocalizableMessage message)
  {
    // Set a flag indicating that the connection is being terminated so
    // that no new requests will be accepted. Also cancel all operations
    // in progress.
    synchronized (opsInProgressLock)
    {
      // If we are already in the middle of a disconnect, then don't
      // do anything.
      if (disconnectRequested)
      {
        return;
      }
      disconnectRequested = true;
    }
    if (keepStats)
    {
      statTracker.updateDisconnect();
    }
    if (connectionID >= 0)
    {
      DirectoryServer.connectionClosed(this);
    }
    // Indicate that this connection is no longer valid.
    connectionValid = false;
    final LocalizableMessage cancelMessage;
    if (message != null)
    {
      cancelMessage = new LocalizableMessageBuilder()
          .append(disconnectReason.getClosureMessage())
          .append(": ")
          .append(message)
          .toMessage();
    }
    else
    {
      cancelMessage = disconnectReason.getClosureMessage();
    }
    cancelAllOperations(new CancelRequest(true, cancelMessage));
    finalizeConnectionInternal();
    // See if we should send a notification to the client. If so, then
    // construct and send a notice of disconnection unsolicited
    // response. Note that we cannot send this notification to an LDAPv2 client.
    if (sendNotification && ldapVersion != 2)
    {
      try
      {
        LocalizableMessage errMsg = message != null ? message : INFO_LDAP_CLIENT_GENERIC_NOTICE_OF_DISCONNECTION.get();
        clientContext.disconnect(ResultCode.valueOf(toResultCode(disconnectReason)), errMsg.toString());
      }
      catch (Exception e)
      {
        // NYI -- Log a message indicating that we couldn't send the
        // notice of disconnection.
        logger.traceException(e);
      }
    }
    else
    {
        clientContext.disconnect();
    }
    // NYI -- Deregister the client connection from any server components that
    // might know about it.
    logDisconnect(this, disconnectReason, message);
    try
    {
      PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
      pluginManager.invokePostDisconnectPlugins(this, disconnectReason, message);
    }
    catch (Exception e)
    {
      logger.traceException(e);
    }
  }
  private int toResultCode(DisconnectReason disconnectReason)
  {
    switch (disconnectReason)
    {
    case PROTOCOL_ERROR:
      return LDAPResultCode.PROTOCOL_ERROR;
    case SERVER_SHUTDOWN:
      return LDAPResultCode.UNAVAILABLE;
    case SERVER_ERROR:
      return DirectoryServer.getServerErrorResultCode().intValue();
    case ADMIN_LIMIT_EXCEEDED:
    case IDLE_TIME_LIMIT_EXCEEDED:
    case MAX_REQUEST_SIZE_EXCEEDED:
    case IO_TIMEOUT:
      return LDAPResultCode.ADMIN_LIMIT_EXCEEDED;
    case CONNECTION_REJECTED:
      return LDAPResultCode.CONSTRAINT_VIOLATION;
    case INVALID_CREDENTIALS:
      return LDAPResultCode.INVALID_CREDENTIALS;
    default:
      return LDAPResultCode.OTHER;
    }
  }
  /**
   * Retrieves the set of operations in progress for this client
   * connection. This list must not be altered by any caller.
   *
   * @return The set of operations in progress for this client
   *         connection.
   */
  @Override
  public Collection<Operation> getOperationsInProgress()
  {
    return operationsInProgress.values();
  }
  /**
   * Retrieves the operation in progress with the specified message ID.
   *
   * @param messageID
   *          The message ID for the operation to retrieve.
   * @return The operation in progress with the specified message ID, or
   *         <CODE>null</CODE> if no such operation could be found.
   */
  @Override
  public Operation getOperationInProgress(int messageID)
  {
    return operationsInProgress.get(messageID);
  }
  /**
   * Adds the provided operation to the set of operations in progress
   * for this client connection.
   *
   * @param operation
   *          The operation to add to the set of operations in progress
   *          for this client connection.
   * @throws DirectoryException
   *           If the operation is not added for some reason (e.g., the
   *           client already has reached the maximum allowed concurrent
   *           requests).
   */
  private void addOperationInProgress(final QueueingStrategy queueingStrategy, Operation operation)
      throws DirectoryException
  {
    int messageID = operation.getMessageID();
    // We need to grab a lock to ensure that no one else can add
    // operations to the queue while we are performing some preliminary
    // checks.
    try
    {
      synchronized (opsInProgressLock)
      {
        // If we're already in the process of disconnecting the client,
        // then reject the operation.
        if (disconnectRequested)
        {
          LocalizableMessage message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
          throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
              message);
        }
        // Add the operation to the list of operations in progress for
        // this connection.
        Operation op = operationsInProgress.putIfAbsent(messageID, operation);
        // See if there is already an operation in progress with the
        // same message ID. If so, then we can't allow it.
        if (op != null)
        {
          LocalizableMessage message =
            WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
          throw new DirectoryException(ResultCode.PROTOCOL_ERROR,
              message);
        }
      }
      // Try to add the operation to the work queue,
      // or run it synchronously (typically for the administration
      // connector)
      queueingStrategy.enqueueRequest(operation);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      operationsInProgress.remove(messageID);
      lastCompletionTime.set(TimeThread.getTime());
      throw de;
    }
    catch (Exception e)
    {
      logger.traceException(e);
      LocalizableMessage message =
        WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
      throw new DirectoryException(DirectoryServer
          .getServerErrorResultCode(), message, e);
    }
  }
  /**
   * Removes the provided operation from the set of operations in
   * progress for this client connection. Note that this does not make
   * any attempt to cancel any processing that may already be in
   * progress for the operation.
   *
   * @param messageID
   *          The message ID of the operation to remove from the set of
   *          operations in progress.
   * @return <CODE>true</CODE> if the operation was found and removed
   *         from the set of operations in progress, or
   *         <CODE>false</CODE> if not.
   */
  @Override
  public boolean removeOperationInProgress(int messageID)
  {
    Operation operation = operationsInProgress.remove(messageID);
    if (operation == null)
    {
      return false;
    }
    if (operation.getOperationType() == OperationType.ABANDON
        && keepStats
        && operation.getResultCode() == ResultCode.CANCELLED)
    {
      statTracker.updateAbandonedOperation();
    }
    lastCompletionTime.set(TimeThread.getTime());
    return true;
  }
  /**
   * Attempts to cancel the specified operation.
   *
   * @param messageID
   *          The message ID of the operation to cancel.
   * @param cancelRequest
   *          An object providing additional information about how the
   *          cancel should be processed.
   * @return A cancel result that either indicates that the cancel was
   *         successful or provides a reason that it was not.
   */
  @Override
  public CancelResult cancelOperation(int messageID,
      CancelRequest cancelRequest)
  {
    Operation op = operationsInProgress.get(messageID);
    if (op != null)
    {
      return op.cancel(cancelRequest);
    }
    // See if the operation is in the list of persistent searches.
    for (PersistentSearch ps : getPersistentSearches())
    {
      if (ps.getMessageID() == messageID)
      {
        // We only need to find the first persistent search
        // associated with the provided message ID. The persistent search
        // will ensure that all other related persistent searches are cancelled.
        return ps.cancel();
      }
    }
    return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
  }
  /**
   * Attempts to cancel all operations in progress on this connection.
   *
   * @param cancelRequest
   *          An object providing additional information about how the
   *          cancel should be processed.
   */
  @Override
  public void cancelAllOperations(CancelRequest cancelRequest)
  {
    // Make sure that no one can add any new operations.
    synchronized (opsInProgressLock)
    {
      try
      {
        for (Operation o : operationsInProgress.values())
        {
          try
          {
            o.abort(cancelRequest);
            // TODO: Assume its cancelled?
            if (keepStats)
            {
              statTracker.updateAbandonedOperation();
    /**
     * Closes the connection to the client, optionally sending it a message indicating the reason for the closure. Note
     * that the ability to send a notice of disconnection may not be available for all protocols or under all
     * circumstances.
     *
     * @param disconnectReason
     *            The disconnect reason that provides the generic cause for the disconnect.
     * @param sendNotification
     *            Indicates whether to try to provide notification to the client that the connection will be closed.
     * @param message
     *            The message to include in the disconnect notification response. It may be <CODE>null</CODE> if no
     *            message is to be sent.
     */
    @Override
    public void disconnect(DisconnectReason disconnectReason, boolean sendNotification, LocalizableMessage message) {
        // Set a flag indicating that the connection is being terminated so
        // that no new requests will be accepted. Also cancel all operations
        // in progress.
        synchronized (opsInProgressLock) {
            // If we are already in the middle of a disconnect, then don't
            // do anything.
            if (disconnectRequested) {
                return;
            }
          }
          catch (Exception e)
          {
            disconnectRequested = true;
        }
        if (keepStats) {
            statTracker.updateDisconnect();
        }
        if (connectionID >= 0) {
            DirectoryServer.connectionClosed(this);
        }
        // Indicate that this connection is no longer valid.
        connectionValid = false;
        final LocalizableMessage cancelMessage;
        if (message != null) {
            cancelMessage = new LocalizableMessageBuilder().append(disconnectReason.getClosureMessage()).append(": ")
                    .append(message).toMessage();
        } else {
            cancelMessage = disconnectReason.getClosureMessage();
        }
        cancelAllOperations(new CancelRequest(true, cancelMessage));
        finalizeConnectionInternal();
        // See if we should send a notification to the client. If so, then
        // construct and send a notice of disconnection unsolicited
        // response. Note that we cannot send this notification to an LDAPv2 client.
        if (sendNotification && ldapVersion != 2) {
            try {
                LocalizableMessage errMsg = message != null ? message
                        : INFO_LDAP_CLIENT_GENERIC_NOTICE_OF_DISCONNECTION.get();
                clientContext.disconnect(ResultCode.valueOf(toResultCode(disconnectReason)), errMsg.toString());
            } catch (Exception e) {
                // NYI -- Log a message indicating that we couldn't send the
                // notice of disconnection.
                logger.traceException(e);
            }
        } else {
            clientContext.disconnect();
        }
        // NYI -- Deregister the client connection from any server components that
        // might know about it.
        logDisconnect(this, disconnectReason, message);
        try {
            PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
            pluginManager.invokePostDisconnectPlugins(this, disconnectReason, message);
        } catch (Exception e) {
            logger.traceException(e);
          }
        }
        if (!operationsInProgress.isEmpty()
            || !getPersistentSearches().isEmpty())
        {
          lastCompletionTime.set(TimeThread.getTime());
        }
        operationsInProgress.clear();
        for (PersistentSearch persistentSearch : getPersistentSearches())
        {
          persistentSearch.cancel();
        }
      }
      catch (Exception e)
      {
        logger.traceException(e);
      }
    }
  }
  /**
   * Attempts to cancel all operations in progress on this connection
   * except the operation with the specified message ID.
   *
   * @param cancelRequest
   *          An object providing additional information about how the
   *          cancel should be processed.
   * @param messageID
   *          The message ID of the operation that should not be
   *          canceled.
   */
  @Override
  public void cancelAllOperationsExcept(CancelRequest cancelRequest,
      int messageID)
  {
    // Make sure that no one can add any new operations.
    synchronized (opsInProgressLock)
    {
      try
      {
        for (int msgID : operationsInProgress.keySet())
        {
          if (msgID == messageID)
          {
            continue;
          }
          Operation o = operationsInProgress.get(msgID);
          if (o != null)
          {
            try
            {
              o.abort(cancelRequest);
              // TODO: Assume its cancelled?
              if (keepStats)
              {
                statTracker.updateAbandonedOperation();
              }
            }
            catch (Exception e)
            {
              logger.traceException(e);
            }
          }
          operationsInProgress.remove(msgID);
          lastCompletionTime.set(TimeThread.getTime());
    private int toResultCode(DisconnectReason disconnectReason) {
        switch (disconnectReason) {
        case PROTOCOL_ERROR:
            return LDAPResultCode.PROTOCOL_ERROR;
        case SERVER_SHUTDOWN:
            return LDAPResultCode.UNAVAILABLE;
        case SERVER_ERROR:
            return DirectoryServer.getServerErrorResultCode().intValue();
        case ADMIN_LIMIT_EXCEEDED:
        case IDLE_TIME_LIMIT_EXCEEDED:
        case MAX_REQUEST_SIZE_EXCEEDED:
        case IO_TIMEOUT:
            return LDAPResultCode.ADMIN_LIMIT_EXCEEDED;
        case CONNECTION_REJECTED:
            return LDAPResultCode.CONSTRAINT_VIOLATION;
        case INVALID_CREDENTIALS:
            return LDAPResultCode.INVALID_CREDENTIALS;
        default:
            return LDAPResultCode.OTHER;
        }
        for (PersistentSearch persistentSearch : getPersistentSearches())
        {
          if (persistentSearch.getMessageID() == messageID)
          {
            continue;
          }
          persistentSearch.cancel();
          lastCompletionTime.set(TimeThread.getTime());
        }
      }
      catch (Exception e)
      {
        logger.traceException(e);
      }
    }
  }
  @Override
  public Selector getWriteSelector()
  {
    throw new UnsupportedOperationException();
  }
    /**
     * Retrieves the set of operations in progress for this client connection. This list must not be altered by any
     * caller.
     *
     * @return The set of operations in progress for this client connection.
     */
    @Override
    public Collection<Operation> getOperationsInProgress() {
        return operationsInProgress.values();
    }
  @Override
  public long getMaxBlockedWriteTimeLimit()
  {
    return connectionHandler.getMaxBlockedWriteTimeLimit();
  }
    /**
     * Retrieves the operation in progress with the specified message ID.
     *
     * @param messageID
     *            The message ID for the operation to retrieve.
     * @return The operation in progress with the specified message ID, or <CODE>null</CODE> if no such operation could
     *         be found.
     */
    @Override
    public Operation getOperationInProgress(int messageID) {
        return operationsInProgress.get(messageID);
    }
  /**
   * Returns the total number of operations initiated on this
   * connection.
   *
   * @return the total number of operations on this connection
   */
  @Override
  public long getNumberOfOperations()
  {
    return operationsPerformed.get();
  }
    /**
     * Adds the provided operation to the set of operations in progress for this client connection.
     *
     * @param operation
     *            The operation to add to the set of operations in progress for this client connection.
     * @throws DirectoryException
     *             If the operation is not added for some reason (e.g., the client already has reached the maximum
     *             allowed concurrent requests).
     */
    private void addOperationInProgress(final QueueingStrategy queueingStrategy, Operation operation)
            throws DirectoryException {
        int messageID = operation.getMessageID();
  /**
   * Processes the provided LDAP message read from the client and takes
   * whatever action is appropriate. For most requests, this will
   * include placing the operation in the work queue. Certain requests
   * (in particular, abandons and unbinds) will be processed directly.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message to process.
   * @return <CODE>true</CODE> if the appropriate action was taken for
   *         the request, or <CODE>false</CODE> if there was a fatal
   *         error and the client has been disconnected as a result, or
   *         if the client unbound from the server.
   */
        // We need to grab a lock to ensure that no one else can add
        // operations to the queue while we are performing some preliminary
        // checks.
        try {
            synchronized (opsInProgressLock) {
                // If we're already in the process of disconnecting the client,
                // then reject the operation.
                if (disconnectRequested) {
                    LocalizableMessage message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
                    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
                }
                // Add the operation to the list of operations in progress for
                // this connection.
                Operation op = operationsInProgress.putIfAbsent(messageID, operation);
                // See if there is already an operation in progress with the
                // same message ID. If so, then we can't allow it.
                if (op != null) {
                    LocalizableMessage message = WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
                    throw new DirectoryException(ResultCode.PROTOCOL_ERROR, message);
                }
            }
            // Try to add the operation to the work queue,
            // or run it synchronously (typically for the administration
            // connector)
            queueingStrategy.enqueueRequest(operation);
        } catch (DirectoryException de) {
            logger.traceException(de);
            operationsInProgress.remove(messageID);
            lastCompletionTime.set(TimeThread.getTime());
            throw de;
        } catch (Exception e) {
            logger.traceException(e);
            LocalizableMessage message = WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
            throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message, e);
        }
    }
    /**
     * Removes the provided operation from the set of operations in progress for this client connection. Note that this
     * does not make any attempt to cancel any processing that may already be in progress for the operation.
     *
     * @param messageID
     *            The message ID of the operation to remove from the set of operations in progress.
     * @return <CODE>true</CODE> if the operation was found and removed from the set of operations in progress, or
     *         <CODE>false</CODE> if not.
     */
    @Override
    public boolean removeOperationInProgress(int messageID) {
        Operation operation = operationsInProgress.remove(messageID);
        if (operation == null) {
            return false;
        }
        if (operation.getOperationType() == OperationType.ABANDON && keepStats
                && operation.getResultCode() == ResultCode.CANCELLED) {
            statTracker.updateAbandonedOperation();
        }
        lastCompletionTime.set(TimeThread.getTime());
        return true;
    }
    /**
     * Attempts to cancel the specified operation.
     *
     * @param messageID
     *            The message ID of the operation to cancel.
     * @param cancelRequest
     *            An object providing additional information about how the cancel should be processed.
     * @return A cancel result that either indicates that the cancel was successful or provides a reason that it was
     *         not.
     */
    @Override
    public CancelResult cancelOperation(int messageID, CancelRequest cancelRequest) {
        Operation op = operationsInProgress.get(messageID);
        if (op != null) {
            return op.cancel(cancelRequest);
        }
        // See if the operation is in the list of persistent searches.
        for (PersistentSearch ps : getPersistentSearches()) {
            if (ps.getMessageID() == messageID) {
                // We only need to find the first persistent search
                // associated with the provided message ID. The persistent search
                // will ensure that all other related persistent searches are cancelled.
                return ps.cancel();
            }
        }
        return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
    }
    /**
     * Attempts to cancel all operations in progress on this connection.
     *
     * @param cancelRequest
     *            An object providing additional information about how the cancel should be processed.
     */
    @Override
    public void cancelAllOperations(CancelRequest cancelRequest) {
        // Make sure that no one can add any new operations.
        synchronized (opsInProgressLock) {
            try {
                for (Operation o : operationsInProgress.values()) {
                    try {
                        o.abort(cancelRequest);
                        // TODO: Assume its cancelled?
                        if (keepStats) {
                            statTracker.updateAbandonedOperation();
                        }
                    } catch (Exception e) {
                        logger.traceException(e);
                    }
                }
                if (!operationsInProgress.isEmpty() || !getPersistentSearches().isEmpty()) {
                    lastCompletionTime.set(TimeThread.getTime());
                }
                operationsInProgress.clear();
                for (PersistentSearch persistentSearch : getPersistentSearches()) {
                    persistentSearch.cancel();
                }
            } catch (Exception e) {
                logger.traceException(e);
            }
        }
    }
    /**
     * Attempts to cancel all operations in progress on this connection except the operation with the specified message
     * ID.
     *
     * @param cancelRequest
     *            An object providing additional information about how the cancel should be processed.
     * @param messageID
     *            The message ID of the operation that should not be canceled.
     */
    @Override
    public void cancelAllOperationsExcept(CancelRequest cancelRequest, int messageID) {
        // Make sure that no one can add any new operations.
        synchronized (opsInProgressLock) {
            try {
                for (int msgID : operationsInProgress.keySet()) {
                    if (msgID == messageID) {
                        continue;
                    }
                    Operation o = operationsInProgress.get(msgID);
                    if (o != null) {
                        try {
                            o.abort(cancelRequest);
                            // TODO: Assume its cancelled?
                            if (keepStats) {
                                statTracker.updateAbandonedOperation();
                            }
                        } catch (Exception e) {
                            logger.traceException(e);
                        }
                    }
                    operationsInProgress.remove(msgID);
                    lastCompletionTime.set(TimeThread.getTime());
                }
                for (PersistentSearch persistentSearch : getPersistentSearches()) {
                    if (persistentSearch.getMessageID() == messageID) {
                        continue;
                    }
                    persistentSearch.cancel();
                    lastCompletionTime.set(TimeThread.getTime());
                }
            } catch (Exception e) {
                logger.traceException(e);
            }
        }
    }
    @Override
    public Selector getWriteSelector() {
        throw new UnsupportedOperationException();
    }
    @Override
    public long getMaxBlockedWriteTimeLimit() {
        return connectionHandler.getMaxBlockedWriteTimeLimit();
    }
    /**
     * Returns the total number of operations initiated on this connection.
     *
     * @return the total number of operations on this connection
     */
    @Override
    public long getNumberOfOperations() {
        return operationsPerformed.get();
    }
    /**
     * Processes the provided LDAP message read from the client and takes whatever action is appropriate. For most
     * requests, this will include placing the operation in the work queue. Certain requests (in particular, abandons
     * and unbinds) will be processed directly.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message to process.
     * @return <CODE>true</CODE> if the appropriate action was taken for the request, or <CODE>false</CODE> if there was
     *         a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
     */
    @Override
    public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
        return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
@@ -1124,855 +957,693 @@
        }, BackpressureMode.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
    }
  private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final FlowableEmitter<Response> out)
  {
    if (keepStats)
    {
      statTracker.updateMessageRead(message);
    private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final FlowableEmitter<Response> out) {
        if (keepStats) {
            statTracker.updateMessageRead(message);
        }
        operationsPerformed.getAndIncrement();
        List<Control> opControls = message.getControls();
        // FIXME -- See if there is a bind in progress. If so, then deny
        // most kinds of operations.
        // Figure out what type of operation we're dealing with based on the
        // LDAP message. Abandon and unbind requests will be processed here.
        // All other types of requests will be encapsulated into operations
        // and append into the work queue to be picked up by a worker
        // thread. Any other kinds of LDAP messages (e.g., response
        // messages) are illegal and will result in the connection being
        // terminated.
        try {
            if (bindInProgress.get()) {
                throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_BIND_IN_PROGRESS.get());
            } else if (startTLSInProgress.get()) {
                throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_STARTTLS_IN_PROGRESS.get());
            } else if (saslBindInProgress.get() && message.getProtocolOpType() != OP_TYPE_BIND_REQUEST) {
                throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_SASLBIND_IN_PROGRESS.get());
            }
            boolean result;
            switch (message.getProtocolOpType()) {
            case OP_TYPE_ABANDON_REQUEST:
                return processAbandonRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_ADD_REQUEST:
                return processAddRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_BIND_REQUEST:
                boolean isSaslBind =
                    message.getBindRequestProtocolOp().getAuthenticationType() == AuthenticationType.SASL;
                bindInProgress.set(true);
                if (isSaslBind) {
                    saslBindInProgress.set(true);
                }
                result = processBindRequest(queueingStrategy, message, opControls, out);
                if (!result) {
                    bindInProgress.set(false);
                    if (isSaslBind) {
                        saslBindInProgress.set(false);
                    }
                }
                return result;
            case OP_TYPE_COMPARE_REQUEST:
                return processCompareRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_DELETE_REQUEST:
                return processDeleteRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_EXTENDED_REQUEST:
                boolean isStartTlsRequest = OID_START_TLS_REQUEST.equals(message.getExtendedRequestProtocolOp()
                        .getOID());
                if (isStartTlsRequest) {
                    startTLSInProgress.set(true);
                }
                result = processExtendedRequest(queueingStrategy, message, opControls, out);
                if (!result && isStartTlsRequest) {
                    startTLSInProgress.set(false);
                }
                return result;
            case OP_TYPE_MODIFY_REQUEST:
                return processModifyRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_MODIFY_DN_REQUEST:
                return processModifyDNRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_SEARCH_REQUEST:
                return processSearchRequest(queueingStrategy, message, opControls, out);
            case OP_TYPE_UNBIND_REQUEST:
                return processUnbindRequest(message, opControls);
            default:
                LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_INVALID_REQUEST_TYPE.get(
                        message.getProtocolOpName(), message.getMessageID());
                disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
                return false;
            }
        } catch (Exception e) {
            logger.traceException(e);
            LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_PROCESSING_FAILURE.get(message.getProtocolOpName(),
                    message.getMessageID(), e);
            disconnect(DisconnectReason.SERVER_ERROR, true, msg);
            return false;
        }
    }
    operationsPerformed.getAndIncrement();
    List<Control> opControls = message.getControls();
    /**
     * Processes the provided LDAP message as an abandon request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the abandon request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processAbandonRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            disconnectControlsNotAllowed();
            return false;
        }
    // FIXME -- See if there is a bind in progress. If so, then deny
    // most kinds of operations.
        // Create the abandon operation and add it into the work queue.
        AbandonRequestProtocolOp protocolOp = message.getAbandonRequestProtocolOp();
        AbandonOperationBasis abandonOp = new AbandonOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getIDToAbandon());
        abandonOp.setAttachment(REACTIVE_OUT, out);
    // Figure out what type of operation we're dealing with based on the
    // LDAP message. Abandon and unbind requests will be processed here.
    // All other types of requests will be encapsulated into operations
    // and append into the work queue to be picked up by a worker
    // thread. Any other kinds of LDAP messages (e.g., response
    // messages) are illegal and will result in the connection being
    // terminated.
    try
    {
      if (bindInProgress.get())
      {
        throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_BIND_IN_PROGRESS.get());
      }
      else if (startTLSInProgress.get())
      {
        throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_STARTTLS_IN_PROGRESS.get());
      }
      else if (saslBindInProgress.get() && message.getProtocolOpType() != OP_TYPE_BIND_REQUEST)
      {
        throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_ENQUEUE_SASLBIND_IN_PROGRESS.get());
      }
        try {
            addOperationInProgress(queueingStrategy, abandonOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
      boolean result;
      switch (message.getProtocolOpType())
      {
      case OP_TYPE_ABANDON_REQUEST:
        return processAbandonRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_ADD_REQUEST:
        return processAddRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_BIND_REQUEST:
        boolean isSaslBind = message.getBindRequestProtocolOp().getAuthenticationType() == AuthenticationType.SASL;
        bindInProgress.set(true);
        if (isSaslBind)
        {
          saslBindInProgress.set(true);
            // Don't send an error response since abandon operations
            // don't have a response.
        }
        result = processBindRequest(queueingStrategy, message, opControls, out);
        if(!result)
        {
          bindInProgress.set(false);
          if (isSaslBind)
          {
            saslBindInProgress.set(false);
          }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as an add request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the add request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processAddRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        return result;
      case OP_TYPE_COMPARE_REQUEST:
        return processCompareRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_DELETE_REQUEST:
        return processDeleteRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_EXTENDED_REQUEST:
        boolean isStartTlsRequest = OID_START_TLS_REQUEST.equals(message.getExtendedRequestProtocolOp().getOID());
        if (isStartTlsRequest)
        {
          startTLSInProgress.set(true);
        // Create the add operation and add it into the work queue.
        AddRequestProtocolOp protocolOp = message.getAddRequestProtocolOp();
        AddOperationBasis addOp = new AddOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getAttributes());
        addOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, addOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            for (String referral : de.getReferralURLs()) {
                result.addReferralURI(referral);
            }
            out.onNext(result);
            out.onComplete();
        }
        result = processExtendedRequest(queueingStrategy, message, opControls, out);
        if (!result && isStartTlsRequest)
        {
          startTLSInProgress.set(false);
        return connectionValid;
    }
    private void disconnectControlsNotAllowed() {
        disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
    }
    /**
     * Processes the provided LDAP message as a bind request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the bind request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processBindRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        BindRequestProtocolOp protocolOp = message.getBindRequestProtocolOp();
        // See if this is an LDAPv2 bind request, and if so whether that
        // should be allowed.
        String versionString;
        switch (ldapVersion = protocolOp.getProtocolVersion()) {
        case 2:
            versionString = "2";
            if (!connectionHandler.allowLDAPv2()) {
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                        ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
                out.onComplete();
                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
                return false;
            }
            if (!controls.isEmpty()) {
                // LDAPv2 clients aren't allowed to send controls.
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                        ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                out.onComplete();
                disconnectControlsNotAllowed();
                return false;
            }
            break;
        case 3:
            versionString = "3";
            break;
        default:
            // Unsupported protocol version. RFC4511 states that we MUST send
            // a protocol error back to the client.
            out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion).toString()));
            out.onComplete();
            disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion));
            return false;
        }
        return result;
      case OP_TYPE_MODIFY_REQUEST:
        return processModifyRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_MODIFY_DN_REQUEST:
        return processModifyDNRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_SEARCH_REQUEST:
        return processSearchRequest(queueingStrategy, message, opControls, out);
      case OP_TYPE_UNBIND_REQUEST:
        return processUnbindRequest(message, opControls);
      default:
        LocalizableMessage msg =
            ERR_LDAP_DISCONNECT_DUE_TO_INVALID_REQUEST_TYPE.get(message
                .getProtocolOpName(), message.getMessageID());
        disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
        ByteString bindDN = protocolOp.getDN();
        BindOperationBasis bindOp;
        switch (protocolOp.getAuthenticationType()) {
        case SIMPLE:
            bindOp = new BindOperationBasis(this, nextOperationID.getAndIncrement(), message.getMessageID(), controls,
                    versionString, bindDN, protocolOp.getSimplePassword());
            break;
        case SASL:
            bindOp = new BindOperationBasis(this, nextOperationID.getAndIncrement(), message.getMessageID(), controls,
                    versionString, bindDN, protocolOp.getSASLMechanism(), protocolOp.getSASLCredentials());
            break;
        default:
            // This is an invalid authentication type, and therefore a
            // protocol error. As per RFC 2251, a protocol error in a bind
            // request must result in terminating the connection.
            LocalizableMessage msg = ERR_LDAP_INVALID_BIND_AUTH_TYPE.get(message.getMessageID(),
                    protocolOp.getAuthenticationType());
            disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
            return false;
        }
        // Add the operation into the work queue.
        bindOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, bindOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newBindResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            for (String referral : de.getReferralURLs()) {
                result.addReferralURI(referral);
            }
            out.onNext(result);
            out.onComplete();
            // If it was a protocol error, then terminate the connection.
            if (de.getResultCode() == ResultCode.PROTOCOL_ERROR) {
                LocalizableMessage msg = ERR_LDAP_DISCONNECT_DUE_TO_BIND_PROTOCOL_ERROR.get(message.getMessageID(),
                        de.getMessageObject());
                disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
            }
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as a compare request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the compare request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processCompareRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        CompareRequestProtocolOp protocolOp = message.getCompareRequestProtocolOp();
        CompareOperationBasis compareOp = new CompareOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getAttributeType(),
                protocolOp.getAssertionValue());
        // Add the operation into the work queue.
        compareOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, compareOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final CompareResult result = Responses.newCompareResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as a delete request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the delete request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processDeleteRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        DeleteRequestProtocolOp protocolOp = message.getDeleteRequestProtocolOp();
        DeleteOperationBasis deleteOp = new DeleteOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getDN());
        // Add the operation into the work queue.
        deleteOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, deleteOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as an extended request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the extended request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processExtendedRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        // See if this is an LDAPv2 client. If it is, then they should not
        // be issuing extended requests. We can't send a response that we
        // can be sure they can understand, so we have no choice but to
        // close the connection.
        if (ldapVersion == 2) {
            // LDAPv2 clients aren't allowed to send controls.
            LocalizableMessage msg = ERR_LDAPV2_EXTENDED_REQUEST_NOT_ALLOWED.get(getConnectionID(),
                    message.getMessageID());
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(msg.toString()));
            out.onComplete();
            logger.error(msg);
            disconnect(DisconnectReason.PROTOCOL_ERROR, false, msg);
            return false;
        }
        // FIXME -- Do we need to handle certain types of request here?
        // -- StartTLS requests
        // -- Cancel requests
        ExtendedRequestProtocolOp protocolOp = message.getExtendedRequestProtocolOp();
        ExtendedOperationBasis extendedOp = new ExtendedOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getOID(), protocolOp.getValue());
        // Add the operation into the work queue.
        extendedOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, extendedOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as a modify request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the modify request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processModifyRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        ModifyRequestProtocolOp protocolOp = message.getModifyRequestProtocolOp();
        ModifyOperationBasis modifyOp = new ModifyOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getDN(), protocolOp.getModifications());
        // Add the operation into the work queue.
        modifyOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, modifyOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as a modify DN request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the modify DN request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processModifyDNRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        ModifyDNRequestProtocolOp protocolOp = message.getModifyDNRequestProtocolOp();
        ModifyDNOperationBasis modifyDNOp = new ModifyDNOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getEntryDN(), protocolOp.getNewRDN(),
                protocolOp.deleteOldRDN(), protocolOp.getNewSuperior());
        // Add the operation into the work queue.
        modifyDNOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, modifyDNOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            for (Control control : modifyDNOp.getResponseControls()) {
                result.addControl(Converters.from(control));
            }
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as a search request.
     *
     * @param queueingStrategy
     *            The {@link QueueingStrategy} to use for operation
     * @param message
     *            The LDAP message containing the search request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processSearchRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
        }
        SearchRequestProtocolOp protocolOp = message.getSearchRequestProtocolOp();
        SearchOperationBasis searchOp = new SearchOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls, protocolOp.getBaseDN(), protocolOp.getScope(),
                protocolOp.getDereferencePolicy(), protocolOp.getSizeLimit(), protocolOp.getTimeLimit(),
                protocolOp.getTypesOnly(), protocolOp.getFilter(), protocolOp.getAttributes());
        // Add the operation into the work queue.
        searchOp.setAttachment(REACTIVE_OUT, out);
        try {
            addOperationInProgress(queueingStrategy, searchOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode());
            if (de.getMessage() != null) {
                result.setDiagnosticMessage(de.getMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (searchOp.getResponseControls() != null) {
                for (Control control : searchOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
        }
        return connectionValid;
    }
    /**
     * Processes the provided LDAP message as an unbind request.
     *
     * @param message
     *            The LDAP message containing the unbind request to process.
     * @param controls
     *            The set of pre-decoded request controls contained in the message.
     * @return <CODE>true</CODE> if the request was processed successfully, or <CODE>false</CODE> if not and the
     *         connection has been closed as a result (it is the responsibility of this method to close the connection).
     */
    private boolean processUnbindRequest(final LDAPMessage message, final List<Control> controls) {
        UnbindOperationBasis unbindOp = new UnbindOperationBasis(this, nextOperationID.getAndIncrement(),
                message.getMessageID(), controls);
        unbindOp.run();
        // The client connection will never be valid after an unbind.
        return false;
      }
    }
    catch (Exception e)
    {
      logger.traceException(e);
      LocalizableMessage msg =
          ERR_LDAP_DISCONNECT_DUE_TO_PROCESSING_FAILURE.get(message
              .getProtocolOpName(), message.getMessageID(), e);
      disconnect(DisconnectReason.SERVER_ERROR, true, msg);
      return false;
    }
  }
  /**
   * Processes the provided LDAP message as an abandon request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the abandon request to
   *          process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processAbandonRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      disconnectControlsNotAllowed();
      return false;
    }
    // Create the abandon operation and add it into the work queue.
    AbandonRequestProtocolOp protocolOp =
        message.getAbandonRequestProtocolOp();
    AbandonOperationBasis abandonOp =
        new AbandonOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getIDToAbandon());
    abandonOp.setAttachment(REACTIVE_OUT, out);
    @Override
    public String getMonitorSummary() {
        StringBuilder buffer = new StringBuilder();
        buffer.append("connID=\"");
        buffer.append(connectionID);
        buffer.append("\" connectTime=\"");
        buffer.append(getConnectTimeString());
        buffer.append("\" source=\"");
        buffer.append(clientAddress);
        buffer.append(":");
        buffer.append(clientPort);
        buffer.append("\" destination=\"");
        buffer.append(serverAddress);
        buffer.append(":");
        buffer.append(connectionHandler.getListeners().iterator().next().getPort());
        buffer.append("\" ldapVersion=\"");
        buffer.append(ldapVersion);
        buffer.append("\" authDN=\"");
    try
    {
      addOperationInProgress(queueingStrategy, abandonOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      // Don't send an error response since abandon operations
      // don't have a response.
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as an add request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the add request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processAddRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage(ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    // Create the add operation and add it into the work queue.
    AddRequestProtocolOp protocolOp = message.getAddRequestProtocolOp();
    AddOperationBasis addOp =
        new AddOperationBasis(this, nextOperationID.getAndIncrement(),
            message.getMessageID(), controls, protocolOp.getDN(),
            protocolOp.getAttributes());
    addOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, addOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result = Responses.newResult(de.getResultCode())
          .setDiagnosticMessage(de.getLocalizedMessage())
          .setMatchedDN(de.getMatchedDN().toString());
      for(String referral : de.getReferralURLs()) {
        result.addReferralURI(referral);
      }
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  private void disconnectControlsNotAllowed()
  {
    disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
  }
  /**
   * Processes the provided LDAP message as a bind request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the bind request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processBindRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    BindRequestProtocolOp protocolOp =
        message.getBindRequestProtocolOp();
    // See if this is an LDAPv2 bind request, and if so whether that
    // should be allowed.
    String versionString;
    switch (ldapVersion = protocolOp.getProtocolVersion())
    {
    case 2:
      versionString = "2";
      if (!connectionHandler.allowLDAPv2())
      {
        out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
                            .setDiagnosticMessage(ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
        out.onComplete();
        disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
        return false;
      }
      if (!controls.isEmpty())
      {
        // LDAPv2 clients aren't allowed to send controls.
        out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
            .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
        out.onComplete();
        disconnectControlsNotAllowed();
        return false;
      }
      break;
    case 3:
      versionString = "3";
      break;
    default:
      // Unsupported protocol version. RFC4511 states that we MUST send
      // a protocol error back to the client.
      out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage(ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion).toString()));
      out.onComplete();
      disconnect(DisconnectReason.PROTOCOL_ERROR, false,
          ERR_LDAP_UNSUPPORTED_PROTOCOL_VERSION.get(ldapVersion));
      return false;
    }
    ByteString bindDN = protocolOp.getDN();
    BindOperationBasis bindOp;
    switch (protocolOp.getAuthenticationType())
    {
    case SIMPLE:
      bindOp =
          new BindOperationBasis(this, nextOperationID
              .getAndIncrement(), message.getMessageID(), controls,
              versionString, bindDN, protocolOp.getSimplePassword());
      break;
    case SASL:
      bindOp =
          new BindOperationBasis(this, nextOperationID
              .getAndIncrement(), message.getMessageID(), controls,
              versionString, bindDN, protocolOp.getSASLMechanism(),
              protocolOp.getSASLCredentials());
      break;
    default:
      // This is an invalid authentication type, and therefore a
      // protocol error. As per RFC 2251, a protocol error in a bind
      // request must result in terminating the connection.
      LocalizableMessage msg =
          ERR_LDAP_INVALID_BIND_AUTH_TYPE.get(message.getMessageID(),
              protocolOp.getAuthenticationType());
      disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
      return false;
    }
    // Add the operation into the work queue.
    bindOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, bindOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result = Responses.newBindResult(de.getResultCode())
                                                      .setDiagnosticMessage(de.getLocalizedMessage())
                                                      .setMatchedDN(de.getMatchedDN().toString());
      for(String referral : de.getReferralURLs())
      {
        result.addReferralURI(referral);
      }
      out.onNext(result);
      out.onComplete();
      // If it was a protocol error, then terminate the connection.
      if (de.getResultCode() == ResultCode.PROTOCOL_ERROR)
      {
        LocalizableMessage msg =
            ERR_LDAP_DISCONNECT_DUE_TO_BIND_PROTOCOL_ERROR.get(message
                .getMessageID(), de.getMessageObject());
        disconnect(DisconnectReason.PROTOCOL_ERROR, true, msg);
      }
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as a compare request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the compare request to
   *          process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processCompareRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    CompareRequestProtocolOp protocolOp =
        message.getCompareRequestProtocolOp();
    CompareOperationBasis compareOp =
        new CompareOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getDN(), protocolOp.getAttributeType(),
            protocolOp.getAssertionValue());
    // Add the operation into the work queue.
    compareOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, compareOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final CompareResult result = Responses.newCompareResult(de.getResultCode())
                                            .setDiagnosticMessage(de.getLocalizedMessage())
                                            .setMatchedDN(de.getMatchedDN().toString());
      result.getReferralURIs().addAll(de.getReferralURLs());
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as a delete request.
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the delete request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processDeleteRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage( ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    DeleteRequestProtocolOp protocolOp =
        message.getDeleteRequestProtocolOp();
    DeleteOperationBasis deleteOp =
        new DeleteOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getDN());
    // Add the operation into the work queue.
    deleteOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, deleteOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result = Responses.newResult(de.getResultCode())
                                     .setDiagnosticMessage(de.getLocalizedMessage())
                                     .setMatchedDN(de.getMatchedDN().toString());
      result.getReferralURIs().addAll(de.getReferralURLs());
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as an extended request.
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the extended request to
   *          process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processExtendedRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
      final List<Control> controls, final FlowableEmitter<Response> out)
  {
    // See if this is an LDAPv2 client. If it is, then they should not
    // be issuing extended requests. We can't send a response that we
    // can be sure they can understand, so we have no choice but to
    // close the connection.
    if (ldapVersion == 2)
    {
      // LDAPv2 clients aren't allowed to send controls.
      LocalizableMessage msg = ERR_LDAPV2_EXTENDED_REQUEST_NOT_ALLOWED.get(getConnectionID(), message.getMessageID());
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage(msg.toString()));
      out.onComplete();
      logger.error(msg);
      disconnect(DisconnectReason.PROTOCOL_ERROR, false, msg);
      return false;
    }
    // FIXME -- Do we need to handle certain types of request here?
    // -- StartTLS requests
    // -- Cancel requests
    ExtendedRequestProtocolOp protocolOp =
        message.getExtendedRequestProtocolOp();
    ExtendedOperationBasis extendedOp =
        new ExtendedOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getOID(), protocolOp.getValue());
    // Add the operation into the work queue.
    extendedOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, extendedOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result = Responses.newResult(de.getResultCode())
                                     .setDiagnosticMessage(de.getMessage())
                                     .setMatchedDN(de.getMatchedDN().toString());
      result.getReferralURIs().addAll(de.getReferralURLs());
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as a modify request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the modify request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processModifyRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    ModifyRequestProtocolOp protocolOp =
        message.getModifyRequestProtocolOp();
    ModifyOperationBasis modifyOp =
        new ModifyOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getDN(), protocolOp.getModifications());
    // Add the operation into the work queue.
    modifyOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, modifyOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result =
          Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()).setMatchedDN(de.getMatchedDN()
              .toString());
      result.getReferralURIs().addAll(de.getReferralURLs());
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as a modify DN request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the modify DN request to
   *          process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processModifyDNRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    ModifyDNRequestProtocolOp protocolOp =
        message.getModifyDNRequestProtocolOp();
    ModifyDNOperationBasis modifyDNOp =
        new ModifyDNOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getEntryDN(), protocolOp.getNewRDN(), protocolOp
                .deleteOldRDN(), protocolOp.getNewSuperior());
    // Add the operation into the work queue.
    modifyDNOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, modifyDNOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result =
          Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage()).setMatchedDN(de.getMatchedDN()
              .toString());
      result.getReferralURIs().addAll(de.getReferralURLs());
      for(Control control : modifyDNOp.getResponseControls()) {
        result.addControl(Converters.from(control));
      }
      out.onNext(result);
      out.onComplete();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as a search request.
   *
   * @param queueingStrategy
   *          The {@link QueueingStrategy} to use for operation
   * @param message
   *          The LDAP message containing the search request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processSearchRequest(final QueueingStrategy queueingStrategy, final LDAPMessage message,
          final List<Control> controls, final FlowableEmitter<Response> out)
  {
    if (ldapVersion == 2 && !controls.isEmpty())
    {
      // LDAPv2 clients aren't allowed to send controls.
      out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
          .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
      out.onComplete();
      disconnectControlsNotAllowed();
      return false;
    }
    SearchRequestProtocolOp protocolOp =
        message.getSearchRequestProtocolOp();
    SearchOperationBasis searchOp =
        new SearchOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls,
            protocolOp.getBaseDN(), protocolOp.getScope(), protocolOp
                .getDereferencePolicy(), protocolOp.getSizeLimit(),
            protocolOp.getTimeLimit(), protocolOp.getTypesOnly(),
            protocolOp.getFilter(), protocolOp.getAttributes());
    // Add the operation into the work queue.
    searchOp.setAttachment(REACTIVE_OUT, out);
    try
    {
      addOperationInProgress(queueingStrategy, searchOp);
    }
    catch (DirectoryException de)
    {
      logger.traceException(de);
      final Result result = Responses.newResult(de.getResultCode());
      if (de.getMessage() != null)
      {
        result.setDiagnosticMessage(de.getMessage());
      }
      if (de.getMatchedDN() != null)
      {
        result.setMatchedDN(de.getMatchedDN().toString());
      }
      if (de.getReferralURLs() != null)
      {
        result.getReferralURIs().addAll(de.getReferralURLs());
      }
      if (searchOp.getResponseControls() != null)
      {
        for (Control control : searchOp.getResponseControls())
        {
          result.addControl(Converters.from(control));
        DN authDN = getAuthenticationInfo().getAuthenticationDN();
        if (authDN != null) {
            buffer.append(authDN);
        }
      }
      out.onNext(result);
      out.onComplete();
        buffer.append("\" security=\"");
        buffer.append("none");
        buffer.append("\" opsInProgress=\"");
        buffer.append(operationsInProgress.size());
        buffer.append("\"");
        int countPSearch = getPersistentSearches().size();
        if (countPSearch > 0) {
            buffer.append(" persistentSearches=\"");
            buffer.append(countPSearch);
            buffer.append("\"");
        }
        return buffer.toString();
    }
    return connectionValid;
  }
  /**
   * Processes the provided LDAP message as an unbind request.
   *
   * @param message
   *          The LDAP message containing the unbind request to process.
   * @param controls
   *          The set of pre-decoded request controls contained in the
   *          message.
   * @return <CODE>true</CODE> if the request was processed
   *         successfully, or <CODE>false</CODE> if not and the
   *         connection has been closed as a result (it is the
   *         responsibility of this method to close the connection).
   */
  private boolean processUnbindRequest(final LDAPMessage message, final List<Control> controls)
  {
    UnbindOperationBasis unbindOp =
        new UnbindOperationBasis(this, nextOperationID
            .getAndIncrement(), message.getMessageID(), controls);
    unbindOp.run();
    // The client connection will never be valid after an unbind.
    return false;
  }
  @Override
  public String getMonitorSummary()
  {
    StringBuilder buffer = new StringBuilder();
    buffer.append("connID=\"");
    buffer.append(connectionID);
    buffer.append("\" connectTime=\"");
    buffer.append(getConnectTimeString());
    buffer.append("\" source=\"");
    buffer.append(clientAddress);
    buffer.append(":");
    buffer.append(clientPort);
    buffer.append("\" destination=\"");
    buffer.append(serverAddress);
    buffer.append(":");
    buffer.append(connectionHandler.getListeners().iterator().next().getPort());
    buffer.append("\" ldapVersion=\"");
    buffer.append(ldapVersion);
    buffer.append("\" authDN=\"");
    DN authDN = getAuthenticationInfo().getAuthenticationDN();
    if (authDN != null)
    {
      buffer.append(authDN);
    /**
     * Appends a string representation of this client connection to the provided buffer.
     *
     * @param buffer
     *            The buffer to which the information should be appended.
     */
    @Override
    public void toString(StringBuilder buffer) {
        buffer.append("LDAP client connection from ");
        buffer.append(clientAddress);
        buffer.append(":");
        buffer.append(clientPort);
        buffer.append(" to ");
        buffer.append(serverAddress);
        buffer.append(":");
        buffer.append(serverPort);
    }
    buffer.append("\" security=\"");
    buffer.append("none");
    buffer.append("\" opsInProgress=\"");
    buffer.append(operationsInProgress.size());
    buffer.append("\"");
    int countPSearch = getPersistentSearches().size();
    if (countPSearch > 0)
    {
      buffer.append(" persistentSearches=\"");
      buffer.append(countPSearch);
      buffer.append("\"");
    @Override
    public boolean prepareTLS(LocalizableMessageBuilder unavailableReason) {
        throw new UnsupportedOperationException();
    }
    return buffer.toString();
  }
  /**
   * Appends a string representation of this client connection to the
   * provided buffer.
   *
   * @param buffer
   *          The buffer to which the information should be appended.
   */
  @Override
  public void toString(StringBuilder buffer)
  {
    buffer.append("LDAP client connection from ");
    buffer.append(clientAddress);
    buffer.append(":");
    buffer.append(clientPort);
    buffer.append(" to ");
    buffer.append(serverAddress);
    buffer.append(":");
    buffer.append(serverPort);
  }
  @Override
  public boolean prepareTLS(LocalizableMessageBuilder unavailableReason)
  {
    throw new UnsupportedOperationException();
  }
  /**
   * Retrieves the length of time in milliseconds that this client
   * connection has been idle. <BR>
   * <BR>
   * Note that the default implementation will always return zero.
   * Subclasses associated with connection handlers should override this
   * method if they wish to provided idle time limit functionality.
   *
   * @return The length of time in milliseconds that this client
   *         connection has been idle.
   */
  @Override
  public long getIdleTime()
  {
    if (operationsInProgress.isEmpty()
        && getPersistentSearches().isEmpty())
    {
      return TimeThread.getTime() - lastCompletionTime.get();
    /**
     * Retrieves the length of time in milliseconds that this client connection has been idle. <BR>
     * <BR>
     * Note that the default implementation will always return zero. Subclasses associated with connection handlers
     * should override this method if they wish to provided idle time limit functionality.
     *
     * @return The length of time in milliseconds that this client connection has been idle.
     */
    @Override
    public long getIdleTime() {
        if (operationsInProgress.isEmpty() && getPersistentSearches().isEmpty()) {
            return TimeThread.getTime() - lastCompletionTime.get();
        } else {
            // There's at least one operation in progress, so it's not idle.
            return 0L;
        }
    }
    else
    {
      // There's at least one operation in progress, so it's not idle.
      return 0L;
    /**
     * Return the certificate chain array associated with a connection.
     *
     * @return The array of certificates associated with a connection.
     */
    public Certificate[] getClientCertificateChain() {
        return new Certificate[0];
    }
  }
  /**
   * Return the certificate chain array associated with a connection.
   *
   * @return The array of certificates associated with a connection.
   */
  public Certificate[] getClientCertificateChain()
  {
    return new Certificate[0];
  }
  @Override
  public int getSSF()
  {
    return 0;
  }
    @Override
    public int getSSF() {
        return 0;
    }
}
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -92,1025 +92,859 @@
import com.forgerock.reactive.Stream;
/**
 * This class defines a connection handler that will be used for communicating
 * with clients over LDAP. It is actually implemented in two parts: as a
 * connection handler and one or more request handlers. The connection handler
 * is responsible for accepting new connections and registering each of them
 * with a request handler. The request handlers then are responsible for reading
 * requests from the clients and parsing them as operations. A single request
 * handler may be used, but having multiple handlers might provide better
 * performance in a multi-CPU system.
 * This class defines a connection handler that will be used for communicating with clients over LDAP. It is actually
 * implemented in two parts: as a connection handler and one or more request handlers. The connection handler is
 * responsible for accepting new connections and registering each of them with a request handler. The request handlers
 * then are responsible for reading requests from the clients and parsing them as operations. A single request handler
 * may be used, but having multiple handlers might provide better performance in a multi-CPU system.
 */
public final class LDAPConnectionHandler2 extends
    ConnectionHandler<LDAPConnectionHandlerCfg> implements
    ConfigurationChangeListener<LDAPConnectionHandlerCfg>,
    ServerShutdownListener, AlertGenerator
{
  /** Task run periodically by the connection finalizer. */
  private final class ConnectionFinalizerRunnable implements Runnable
  {
public final class LDAPConnectionHandler2 extends ConnectionHandler<LDAPConnectionHandlerCfg> implements
        ConfigurationChangeListener<LDAPConnectionHandlerCfg>, ServerShutdownListener, AlertGenerator {
    /** Task run periodically by the connection finalizer. */
    private final class ConnectionFinalizerRunnable implements Runnable {
        @Override
        public void run() {
            if (!connectionFinalizerActiveJobQueue.isEmpty()) {
                for (Runnable r : connectionFinalizerActiveJobQueue) {
                    r.run();
                }
                connectionFinalizerActiveJobQueue.clear();
            }
            // Switch the lists.
            synchronized (connectionFinalizerLock) {
                List<Runnable> tmp = connectionFinalizerActiveJobQueue;
                connectionFinalizerActiveJobQueue = connectionFinalizerPendingJobQueue;
                connectionFinalizerPendingJobQueue = tmp;
            }
        }
    }
    private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
    /** Default friendly name for the LDAP connection handler. */
    private static final String DEFAULT_FRIENDLY_NAME = "LDAP Connection Handler";
    /** SSL instance name used in context creation. */
    private static final String SSL_CONTEXT_INSTANCE_NAME = "TLS";
    private GrizzlyLDAPListener listener;
    /** The current configuration state. */
    private LDAPConnectionHandlerCfg currentConfig;
    /* Properties that cannot be modified dynamically */
    /** The set of addresses on which to listen for new connections. */
    private Set<InetSocketAddress> listenAddresses;
    /** The SSL client auth policy used by this connection handler. */
    private SSLClientAuthPolicy sslClientAuthPolicy;
    /** The backlog that will be used for the accept queue. */
    private int backlog;
    /** Indicates whether to allow the reuse address socket option. */
    private boolean allowReuseAddress;
    /** Indicates whether the Directory Server is in the process of shutting down. */
    private volatile boolean shutdownRequested;
    /* Internal LDAP connection handler state */
    /** Indicates whether this connection handler is enabled. */
    private boolean enabled;
    /** The set of clients that are explicitly allowed access to the server. */
    private Collection<AddressMask> allowedClients;
    /** The set of clients that have been explicitly denied access to the server. */
    private Collection<AddressMask> deniedClients;
    /** The set of listeners for this connection handler. */
    private List<HostPort> listeners;
    /** The set of statistics collected for this connection handler. */
    private LDAPStatistics statTracker;
    /** The client connection monitor provider associated with this connection handler. */
    private ClientConnectionMonitorProvider connMonitor;
    /** The unique name assigned to this connection handler. */
    private String handlerName;
    /** The protocol used by this connection handler. */
    private String protocol;
    /** Queueing strategy. */
    private final QueueingStrategy queueingStrategy;
    /**
     * The condition variable that will be used by the start method to wait for the socket port to be opened and ready
     * to process requests before returning.
     */
    private final Object waitListen = new Object();
    /** The friendly name of this connection handler. */
    private String friendlyName;
    /**
     * SSL context.
     *
     * @see LDAPConnectionHandler2#sslEngine
     */
    private SSLContext sslContext;
    /** The SSL engine is used for obtaining default SSL parameters. */
    private SSLEngine sslEngine;
    /**
     * Connection finalizer thread.
     * <p>
     * This thread is defers closing clients for approximately 100ms. This gives the client a chance to close the
     * connection themselves before the server thus avoiding leaving the server side in the TIME WAIT state.
     */
    private final Object connectionFinalizerLock = new Object();
    private ScheduledExecutorService connectionFinalizer;
    private List<Runnable> connectionFinalizerActiveJobQueue;
    private List<Runnable> connectionFinalizerPendingJobQueue;
    private final List<ClientConnection> connectionList = Collections
            .synchronizedList(new ArrayList<ClientConnection>());
    /**
     * Creates a new instance of this LDAP connection handler. It must be initialized before it may be used.
     */
    public LDAPConnectionHandler2() {
        this(new WorkQueueStrategy(), null); // Use name from configuration.
    }
    /**
     * Creates a new instance of this LDAP connection handler, using a queueing strategy. It must be initialized before
     * it may be used.
     *
     * @param strategy
     *            Request handling strategy.
     * @param friendlyName
     *            The name of of this connection handler, or {@code null} if the name should be taken from the
     *            configuration.
     */
    public LDAPConnectionHandler2(QueueingStrategy strategy, String friendlyName) {
        super(friendlyName != null ? friendlyName : DEFAULT_FRIENDLY_NAME + " Thread");
        this.friendlyName = friendlyName;
        this.queueingStrategy = strategy;
    }
    /**
     * Indicates whether this connection handler should allow interaction with LDAPv2 clients.
     *
     * @return <CODE>true</CODE> if LDAPv2 is allowed, or <CODE>false</CODE> if not.
     */
    public boolean allowLDAPv2() {
        return currentConfig.isAllowLDAPV2();
    }
    /**
     * Indicates whether this connection handler should allow the use of the StartTLS extended operation.
     *
     * @return <CODE>true</CODE> if StartTLS is allowed, or <CODE>false</CODE> if not.
     */
    public boolean allowStartTLS() {
        return currentConfig.isAllowStartTLS() && !currentConfig.isUseSSL();
    }
    @Override
    public void run()
    {
      if (!connectionFinalizerActiveJobQueue.isEmpty())
      {
        for (Runnable r : connectionFinalizerActiveJobQueue)
        {
          r.run();
    public ConfigChangeResult applyConfigurationChange(LDAPConnectionHandlerCfg config) {
        final ConfigChangeResult ccr = new ConfigChangeResult();
        // Note that the following properties cannot be modified:
        // * listen port and addresses
        // * use ssl
        // * ssl policy
        // * ssl cert nickname
        // * accept backlog
        // * tcp reuse address
        // * num request handler
        // Clear the stat tracker if LDAPv2 is being enabled.
        if (currentConfig.isAllowLDAPV2() != config.isAllowLDAPV2() && config.isAllowLDAPV2()) {
            statTracker.clearStatistics();
        }
        connectionFinalizerActiveJobQueue.clear();
      }
      // Switch the lists.
      synchronized (connectionFinalizerLock)
      {
        List<Runnable> tmp = connectionFinalizerActiveJobQueue;
        connectionFinalizerActiveJobQueue = connectionFinalizerPendingJobQueue;
        connectionFinalizerPendingJobQueue = tmp;
      }
    }
  }
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
        // Apply the changes.
        currentConfig = config;
        enabled = config.isEnabled();
        allowedClients = config.getAllowedClient();
        deniedClients = config.getDeniedClient();
  /** Default friendly name for the LDAP connection handler. */
  private static final String DEFAULT_FRIENDLY_NAME = "LDAP Connection Handler";
        // Reconfigure SSL if needed.
        try {
            configureSSL(config);
        } catch (DirectoryException e) {
            logger.traceException(e);
            ccr.setResultCode(e.getResultCode());
            ccr.addMessage(e.getMessageObject());
            return ccr;
        }
  /** SSL instance name used in context creation. */
  private static final String SSL_CONTEXT_INSTANCE_NAME = "TLS";
        if (config.isAllowLDAPV2()) {
            DirectoryServer.registerSupportedLDAPVersion(2, this);
        } else {
            DirectoryServer.deregisterSupportedLDAPVersion(2, this);
        }
  private GrizzlyLDAPListener listener;
  /** The current configuration state. */
  private LDAPConnectionHandlerCfg currentConfig;
  /* Properties that cannot be modified dynamically */
  /** The set of addresses on which to listen for new connections. */
  private Set<InetSocketAddress> listenAddresses;
  /** The SSL client auth policy used by this connection handler. */
  private SSLClientAuthPolicy sslClientAuthPolicy;
  /** The backlog that will be used for the accept queue. */
  private int backlog;
  /** Indicates whether to allow the reuse address socket option. */
  private boolean allowReuseAddress;
  /** Indicates whether the Directory Server is in the process of shutting down. */
  private volatile boolean shutdownRequested;
  /* Internal LDAP connection handler state */
  /** Indicates whether this connection handler is enabled. */
  private boolean enabled;
  /** The set of clients that are explicitly allowed access to the server. */
  private Collection<AddressMask> allowedClients;
  /** The set of clients that have been explicitly denied access to the server. */
  private Collection<AddressMask> deniedClients;
  /** The set of listeners for this connection handler. */
  private List<HostPort> listeners;
  /** The set of statistics collected for this connection handler. */
  private LDAPStatistics statTracker;
  /** The client connection monitor provider associated with this connection handler. */
  private ClientConnectionMonitorProvider connMonitor;
  /** The unique name assigned to this connection handler. */
  private String handlerName;
  /** The protocol used by this connection handler. */
  private String protocol;
  /** Queueing strategy. */
  private final QueueingStrategy queueingStrategy;
  /**
   * The condition variable that will be used by the start method to wait for
   * the socket port to be opened and ready to process requests before
   * returning.
   */
  private final Object waitListen = new Object();
  /** The friendly name of this connection handler. */
  private String friendlyName;
  /**
   * SSL context.
   *
   * @see LDAPConnectionHandler2#sslEngine
   */
  private SSLContext sslContext;
  /** The SSL engine is used for obtaining default SSL parameters. */
  private SSLEngine sslEngine;
  /**
   * Connection finalizer thread.
   * <p>
   * This thread is defers closing clients for approximately 100ms. This gives
   * the client a chance to close the connection themselves before the server
   * thus avoiding leaving the server side in the TIME WAIT state.
   */
  private final Object connectionFinalizerLock = new Object();
  private ScheduledExecutorService connectionFinalizer;
  private List<Runnable> connectionFinalizerActiveJobQueue;
  private List<Runnable> connectionFinalizerPendingJobQueue;
  private final List<ClientConnection> connectionList = Collections.synchronizedList(new ArrayList<ClientConnection>());
  /**
   * Creates a new instance of this LDAP connection handler. It must be
   * initialized before it may be used.
   */
  public LDAPConnectionHandler2()
  {
    this(new WorkQueueStrategy(), null); // Use name from configuration.
  }
  /**
   * Creates a new instance of this LDAP connection handler, using a queueing
   * strategy. It must be initialized before it may be used.
   *
   * @param strategy
   *          Request handling strategy.
   * @param friendlyName
   *          The name of of this connection handler, or {@code null} if the
   *          name should be taken from the configuration.
   */
  public LDAPConnectionHandler2(QueueingStrategy strategy, String friendlyName)
  {
    super(friendlyName != null ? friendlyName : DEFAULT_FRIENDLY_NAME + " Thread");
    this.friendlyName = friendlyName;
    this.queueingStrategy = strategy;
  }
  /**
   * Indicates whether this connection handler should allow interaction with
   * LDAPv2 clients.
   *
   * @return <CODE>true</CODE> if LDAPv2 is allowed, or <CODE>false</CODE> if
   *         not.
   */
  public boolean allowLDAPv2()
  {
    return currentConfig.isAllowLDAPV2();
  }
  /**
   * Indicates whether this connection handler should allow the use of the
   * StartTLS extended operation.
   *
   * @return <CODE>true</CODE> if StartTLS is allowed, or <CODE>false</CODE> if
   *         not.
   */
  public boolean allowStartTLS()
  {
    return currentConfig.isAllowStartTLS() && !currentConfig.isUseSSL();
  }
  @Override
  public ConfigChangeResult applyConfigurationChange(
      LDAPConnectionHandlerCfg config)
  {
    final ConfigChangeResult ccr = new ConfigChangeResult();
    // Note that the following properties cannot be modified:
    // * listen port and addresses
    // * use ssl
    // * ssl policy
    // * ssl cert nickname
    // * accept backlog
    // * tcp reuse address
    // * num request handler
    // Clear the stat tracker if LDAPv2 is being enabled.
    if (currentConfig.isAllowLDAPV2() != config.isAllowLDAPV2()
        && config.isAllowLDAPV2())
    {
      statTracker.clearStatistics();
        return ccr;
    }
    // Apply the changes.
    currentConfig = config;
    enabled = config.isEnabled();
    allowedClients = config.getAllowedClient();
    deniedClients = config.getDeniedClient();
    // Reconfigure SSL if needed.
    try
    {
      configureSSL(config);
    }
    catch (DirectoryException e)
    {
      logger.traceException(e);
      ccr.setResultCode(e.getResultCode());
      ccr.addMessage(e.getMessageObject());
      return ccr;
    private void configureSSL(LDAPConnectionHandlerCfg config) throws DirectoryException {
        protocol = config.isUseSSL() ? "LDAPS" : "LDAP";
        if (config.isUseSSL() || config.isAllowStartTLS()) {
            sslContext = createSSLContext(config);
            sslEngine = createSSLEngine(config, sslContext);
        } else {
            sslContext = null;
            sslEngine = null;
        }
    }
    if (config.isAllowLDAPV2())
    {
      DirectoryServer.registerSupportedLDAPVersion(2, this);
    }
    else
    {
      DirectoryServer.deregisterSupportedLDAPVersion(2, this);
    @Override
    public void finalizeConnectionHandler(LocalizableMessage finalizeReason) {
        shutdownRequested = true;
        currentConfig.removeLDAPChangeListener(this);
        if (connMonitor != null) {
            DirectoryServer.deregisterMonitorProvider(connMonitor);
        }
        if (statTracker != null) {
            DirectoryServer.deregisterMonitorProvider(statTracker);
        }
        DirectoryServer.deregisterSupportedLDAPVersion(2, this);
        DirectoryServer.deregisterSupportedLDAPVersion(3, this);
        // Shutdown the connection finalizer and ensure that any pending
        // unclosed connections are closed.
        synchronized (connectionFinalizerLock) {
            connectionFinalizer.shutdown();
            connectionFinalizer = null;
            Runnable r = new ConnectionFinalizerRunnable();
            r.run(); // Flush active queue.
            r.run(); // Flush pending queue.
        }
    }
    return ccr;
  }
    /**
     * Retrieves information about the set of alerts that this generator may produce. The map returned should be between
     * the notification type for a particular notification and the human-readable description for that notification.
     * This alert generator must not generate any alerts with types that are not contained in this list.
     *
     * @return Information about the set of alerts that this generator may produce.
     */
    @Override
    public Map<String, String> getAlerts() {
        Map<String, String> alerts = new LinkedHashMap<>();
  private void configureSSL(LDAPConnectionHandlerCfg config)
      throws DirectoryException
  {
    protocol = config.isUseSSL() ? "LDAPS" : "LDAP";
    if (config.isUseSSL() || config.isAllowStartTLS())
    {
      sslContext = createSSLContext(config);
      sslEngine = createSSLEngine(config, sslContext);
    }
    else
    {
      sslContext = null;
      sslEngine = null;
    }
  }
        alerts.put(ALERT_TYPE_LDAP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES,
                ALERT_DESCRIPTION_LDAP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES);
        alerts.put(ALERT_TYPE_LDAP_CONNECTION_HANDLER_UNCAUGHT_ERROR,
                ALERT_DESCRIPTION_LDAP_CONNECTION_HANDLER_UNCAUGHT_ERROR);
  @Override
  public void finalizeConnectionHandler(LocalizableMessage finalizeReason)
  {
    shutdownRequested = true;
    currentConfig.removeLDAPChangeListener(this);
    if (connMonitor != null)
    {
      DirectoryServer.deregisterMonitorProvider(connMonitor);
        return alerts;
    }
    if (statTracker != null)
    {
      DirectoryServer.deregisterMonitorProvider(statTracker);
    /**
     * Retrieves the fully-qualified name of the Java class for this alert generator implementation.
     *
     * @return The fully-qualified name of the Java class for this alert generator implementation.
     */
    @Override
    public String getClassName() {
        return LDAPConnectionHandler2.class.getName();
    }
    DirectoryServer.deregisterSupportedLDAPVersion(2, this);
    DirectoryServer.deregisterSupportedLDAPVersion(3, this);
    // Shutdown the connection finalizer and ensure that any pending
    // unclosed connections are closed.
    synchronized (connectionFinalizerLock)
    {
      connectionFinalizer.shutdown();
      connectionFinalizer = null;
      Runnable r = new ConnectionFinalizerRunnable();
      r.run(); // Flush active queue.
      r.run(); // Flush pending queue.
    }
  }
  /**
   * Retrieves information about the set of alerts that this generator may
   * produce. The map returned should be between the notification type for a
   * particular notification and the human-readable description for that
   * notification. This alert generator must not generate any alerts with types
   * that are not contained in this list.
   *
   * @return Information about the set of alerts that this generator may
   *         produce.
   */
  @Override
  public Map<String, String> getAlerts()
  {
    Map<String, String> alerts = new LinkedHashMap<>();
    alerts.put(ALERT_TYPE_LDAP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES,
        ALERT_DESCRIPTION_LDAP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES);
    alerts.put(ALERT_TYPE_LDAP_CONNECTION_HANDLER_UNCAUGHT_ERROR,
        ALERT_DESCRIPTION_LDAP_CONNECTION_HANDLER_UNCAUGHT_ERROR);
    return alerts;
  }
  /**
   * Retrieves the fully-qualified name of the Java class for this alert
   * generator implementation.
   *
   * @return The fully-qualified name of the Java class for this alert generator
   *         implementation.
   */
  @Override
  public String getClassName()
  {
    return LDAPConnectionHandler2.class.getName();
  }
  /**
   * Retrieves the set of active client connections that have been established
   * through this connection handler.
   *
   * @return The set of active client connections that have been established
   *         through this connection handler.
   */
  @Override
  public Collection<ClientConnection> getClientConnections()
  {
    return connectionList;
  }
  /**
   * Retrieves the DN of the configuration entry with which this alert generator
   * is associated.
   *
   * @return The DN of the configuration entry with which this alert generator
   *         is associated.
   */
  @Override
  public DN getComponentEntryDN()
  {
    return currentConfig.dn();
  }
  @Override
  public String getConnectionHandlerName()
  {
    return handlerName;
  }
  @Override
  public Collection<String> getEnabledSSLCipherSuites()
  {
    final SSLEngine engine = sslEngine;
    if (engine != null)
    {
      return Arrays.asList(engine.getEnabledCipherSuites());
    }
    return super.getEnabledSSLCipherSuites();
  }
  @Override
  public Collection<String> getEnabledSSLProtocols()
  {
    final SSLEngine engine = sslEngine;
    if (engine != null)
    {
      return Arrays.asList(engine.getEnabledProtocols());
    }
    return super.getEnabledSSLProtocols();
  }
  @Override
  public Collection<HostPort> getListeners()
  {
    return listeners;
  }
  /**
   * Retrieves the maximum length of time in milliseconds that attempts to write
   * to LDAP client connections should be allowed to block.
   *
   * @return The maximum length of time in milliseconds that attempts to write
   *         to LDAP client connections should be allowed to block, or zero if
   *         there should not be any limit imposed.
   */
  public long getMaxBlockedWriteTimeLimit()
  {
    return currentConfig.getMaxBlockedWriteTimeLimit();
  }
  /**
   * Retrieves the maximum ASN.1 element value length that will be allowed by
   * this connection handler.
   *
   * @return The maximum ASN.1 element value length that will be allowed by this
   *         connection handler.
   */
  public int getMaxRequestSize()
  {
    return (int) currentConfig.getMaxRequestSize();
  }
  /**
   * Retrieves the size in bytes of the LDAP response message write buffer
   * defined for this connection handler.
   *
   * @return The size in bytes of the LDAP response message write buffer.
   */
  public int getBufferSize()
  {
    return (int) currentConfig.getBufferSize();
  }
  @Override
  public String getProtocol()
  {
    return protocol;
  }
  @Override
  public String getShutdownListenerName()
  {
    return handlerName;
  }
  /**
   * Retrieves the SSL client authentication policy for this connection handler.
   *
   * @return The SSL client authentication policy for this connection handler.
   */
  public SSLClientAuthPolicy getSSLClientAuthPolicy()
  {
    return sslClientAuthPolicy;
  }
  /**
   * Retrieves the set of statistics maintained by this connection handler.
   *
   * @return The set of statistics maintained by this connection handler.
   */
  public LDAPStatistics getStatTracker()
  {
    return statTracker;
  }
  @Override
  public void initializeConnectionHandler(ServerContext serverContext, LDAPConnectionHandlerCfg config)
      throws ConfigException, InitializationException
  {
    if (friendlyName == null)
    {
      friendlyName = config.dn().rdn().getFirstAVA().getAttributeValue().toString();
    /**
     * Retrieves the set of active client connections that have been established through this connection handler.
     *
     * @return The set of active client connections that have been established through this connection handler.
     */
    @Override
    public Collection<ClientConnection> getClientConnections() {
        return connectionList;
    }
    // Save this configuration for future reference.
    currentConfig = config;
    enabled = config.isEnabled();
    allowedClients = config.getAllowedClient();
    deniedClients = config.getDeniedClient();
    // Configure SSL if needed.
    try
    {
      // This call may disable the connector if wrong SSL settings
      configureSSL(config);
    }
    catch (DirectoryException e)
    {
      logger.traceException(e);
      throw new InitializationException(e.getMessageObject());
    /**
     * Retrieves the DN of the configuration entry with which this alert generator is associated.
     *
     * @return The DN of the configuration entry with which this alert generator is associated.
     */
    @Override
    public DN getComponentEntryDN() {
        return currentConfig.dn();
    }
    // Save properties that cannot be dynamically modified.
    allowReuseAddress = config.isAllowTCPReuseAddress();
    backlog = config.getAcceptBacklog();
    listenAddresses = new HashSet<>();
    for(InetAddress addr :config.getListenAddress()) {
      listenAddresses.add(new InetSocketAddress(addr, config.getListenPort()));
    @Override
    public String getConnectionHandlerName() {
        return handlerName;
    }
    // Construct a unique name for this connection handler, and put
    // together the set of listeners.
    listeners = new LinkedList<>();
    StringBuilder nameBuffer = new StringBuilder();
    nameBuffer.append(friendlyName);
    for (InetSocketAddress a : listenAddresses)
    {
      listeners.add(new HostPort(a.getHostName(), a.getPort()));
      nameBuffer.append(" ");
      nameBuffer.append(a.getHostName());
    }
    nameBuffer.append(" port ");
    nameBuffer.append(config.getListenPort());
    handlerName = nameBuffer.toString();
    // Attempt to bind to the listen port on all configured addresses to
    // verify whether the connection handler will be able to start.
    LocalizableMessage errorMessage =
        checkAnyListenAddressInUse(config.getListenAddress(), config.getListenPort(),
            allowReuseAddress, config.dn());
    if (errorMessage != null)
    {
      logger.error(errorMessage);
      throw new InitializationException(errorMessage);
    @Override
    public Collection<String> getEnabledSSLCipherSuites() {
        final SSLEngine engine = sslEngine;
        if (engine != null) {
            return Arrays.asList(engine.getEnabledCipherSuites());
        }
        return super.getEnabledSSLCipherSuites();
    }
    // Create a system property to store the LDAP(S) port the server is
    // listening to. This information can be displayed with jinfo.
    System.setProperty(protocol + "_port", String.valueOf(config.getListenPort()));
    // Create and start a connection finalizer thread for this
    // connection handler.
    connectionFinalizer = Executors
        .newSingleThreadScheduledExecutor(new DirectoryThread.Factory(
            "LDAP Connection Finalizer for connection handler " + toString()));
    connectionFinalizerActiveJobQueue = new ArrayList<>();
    connectionFinalizerPendingJobQueue = new ArrayList<>();
    connectionFinalizer.scheduleWithFixedDelay(
        new ConnectionFinalizerRunnable(), 100, 100, TimeUnit.MILLISECONDS);
    // Register the set of supported LDAP versions.
    DirectoryServer.registerSupportedLDAPVersion(3, this);
    if (config.isAllowLDAPV2())
    {
      DirectoryServer.registerSupportedLDAPVersion(2, this);
    @Override
    public Collection<String> getEnabledSSLProtocols() {
        final SSLEngine engine = sslEngine;
        if (engine != null) {
            return Arrays.asList(engine.getEnabledProtocols());
        }
        return super.getEnabledSSLProtocols();
    }
    // Create and register monitors.
    statTracker = new LDAPStatistics(handlerName + " Statistics");
    DirectoryServer.registerMonitorProvider(statTracker);
    connMonitor = new ClientConnectionMonitorProvider(this);
    DirectoryServer.registerMonitorProvider(connMonitor);
    // Register this as a change listener.
    config.addLDAPChangeListener(this);
  }
  @Override
  public boolean isConfigurationAcceptable(ConnectionHandlerCfg configuration,
      List<LocalizableMessage> unacceptableReasons)
  {
    LDAPConnectionHandlerCfg config = (LDAPConnectionHandlerCfg) configuration;
    if (currentConfig == null
        || (!currentConfig.isEnabled() && config.isEnabled()))
    {
      // Attempt to bind to the listen port on all configured addresses to
      // verify whether the connection handler will be able to start.
      LocalizableMessage errorMessage =
          checkAnyListenAddressInUse(config.getListenAddress(), config
              .getListenPort(), config.isAllowTCPReuseAddress(), config.dn());
      if (errorMessage != null)
      {
        unacceptableReasons.add(errorMessage);
        return false;
      }
    @Override
    public Collection<HostPort> getListeners() {
        return listeners;
    }
    if (config.isEnabled()
    /**
     * Retrieves the maximum length of time in milliseconds that attempts to write to LDAP client connections should be
     * allowed to block.
     *
     * @return The maximum length of time in milliseconds that attempts to write to LDAP client connections should be
     *         allowed to block, or zero if there should not be any limit imposed.
     */
    public long getMaxBlockedWriteTimeLimit() {
        return currentConfig.getMaxBlockedWriteTimeLimit();
    }
    /**
     * Retrieves the maximum ASN.1 element value length that will be allowed by this connection handler.
     *
     * @return The maximum ASN.1 element value length that will be allowed by this connection handler.
     */
    public int getMaxRequestSize() {
        return (int) currentConfig.getMaxRequestSize();
    }
    /**
     * Retrieves the size in bytes of the LDAP response message write buffer defined for this connection handler.
     *
     * @return The size in bytes of the LDAP response message write buffer.
     */
    public int getBufferSize() {
        return (int) currentConfig.getBufferSize();
    }
    @Override
    public String getProtocol() {
        return protocol;
    }
    @Override
    public String getShutdownListenerName() {
        return handlerName;
    }
    /**
     * Retrieves the SSL client authentication policy for this connection handler.
     *
     * @return The SSL client authentication policy for this connection handler.
     */
    public SSLClientAuthPolicy getSSLClientAuthPolicy() {
        return sslClientAuthPolicy;
    }
    /**
     * Retrieves the set of statistics maintained by this connection handler.
     *
     * @return The set of statistics maintained by this connection handler.
     */
    public LDAPStatistics getStatTracker() {
        return statTracker;
    }
    @Override
    public void initializeConnectionHandler(ServerContext serverContext, LDAPConnectionHandlerCfg config)
            throws ConfigException, InitializationException {
        if (friendlyName == null) {
            friendlyName = config.dn().rdn().getFirstAVA().getAttributeValue().toString();
        }
        // Save this configuration for future reference.
        currentConfig = config;
        enabled = config.isEnabled();
        allowedClients = config.getAllowedClient();
        deniedClients = config.getDeniedClient();
        // Configure SSL if needed.
        try {
            // This call may disable the connector if wrong SSL settings
            configureSSL(config);
        } catch (DirectoryException e) {
            logger.traceException(e);
            throw new InitializationException(e.getMessageObject());
        }
        // Save properties that cannot be dynamically modified.
        allowReuseAddress = config.isAllowTCPReuseAddress();
        backlog = config.getAcceptBacklog();
        listenAddresses = new HashSet<>();
        for (InetAddress addr : config.getListenAddress()) {
            listenAddresses.add(new InetSocketAddress(addr, config.getListenPort()));
        }
        // Construct a unique name for this connection handler, and put
        // together the set of listeners.
        listeners = new LinkedList<>();
        StringBuilder nameBuffer = new StringBuilder();
        nameBuffer.append(friendlyName);
        for (InetSocketAddress a : listenAddresses) {
            listeners.add(new HostPort(a.getHostName(), a.getPort()));
            nameBuffer.append(" ");
            nameBuffer.append(a.getHostName());
        }
        nameBuffer.append(" port ");
        nameBuffer.append(config.getListenPort());
        handlerName = nameBuffer.toString();
        // Attempt to bind to the listen port on all configured addresses to
        // verify whether the connection handler will be able to start.
        LocalizableMessage errorMessage = checkAnyListenAddressInUse(config.getListenAddress(), config.getListenPort(),
                allowReuseAddress, config.dn());
        if (errorMessage != null) {
            logger.error(errorMessage);
            throw new InitializationException(errorMessage);
        }
        // Create a system property to store the LDAP(S) port the server is
        // listening to. This information can be displayed with jinfo.
        System.setProperty(protocol + "_port", String.valueOf(config.getListenPort()));
        // Create and start a connection finalizer thread for this
        // connection handler.
        connectionFinalizer = Executors.newSingleThreadScheduledExecutor(new DirectoryThread.Factory(
                "LDAP Connection Finalizer for connection handler " + toString()));
        connectionFinalizerActiveJobQueue = new ArrayList<>();
        connectionFinalizerPendingJobQueue = new ArrayList<>();
        connectionFinalizer.scheduleWithFixedDelay(new ConnectionFinalizerRunnable(), 100, 100, TimeUnit.MILLISECONDS);
        // Register the set of supported LDAP versions.
        DirectoryServer.registerSupportedLDAPVersion(3, this);
        if (config.isAllowLDAPV2()) {
            DirectoryServer.registerSupportedLDAPVersion(2, this);
        }
        // Create and register monitors.
        statTracker = new LDAPStatistics(handlerName + " Statistics");
        DirectoryServer.registerMonitorProvider(statTracker);
        connMonitor = new ClientConnectionMonitorProvider(this);
        DirectoryServer.registerMonitorProvider(connMonitor);
        // Register this as a change listener.
        config.addLDAPChangeListener(this);
    }
    @Override
    public boolean isConfigurationAcceptable(ConnectionHandlerCfg configuration,
            List<LocalizableMessage> unacceptableReasons) {
        LDAPConnectionHandlerCfg config = (LDAPConnectionHandlerCfg) configuration;
        if (currentConfig == null || (!currentConfig.isEnabled() && config.isEnabled())) {
            // Attempt to bind to the listen port on all configured addresses to
            // verify whether the connection handler will be able to start.
            LocalizableMessage errorMessage = checkAnyListenAddressInUse(config.getListenAddress(),
                    config.getListenPort(), config.isAllowTCPReuseAddress(), config.dn());
            if (errorMessage != null) {
                unacceptableReasons.add(errorMessage);
                return false;
            }
        }
        if (config.isEnabled()
        // Check that the SSL configuration is valid.
        && (config.isUseSSL() || config.isAllowStartTLS()))
    {
      try
      {
        createSSLEngine(config, createSSLContext(config));
      }
      catch (DirectoryException e)
      {
        logger.traceException(e);
                && (config.isUseSSL() || config.isAllowStartTLS())) {
            try {
                createSSLEngine(config, createSSLContext(config));
            } catch (DirectoryException e) {
                logger.traceException(e);
        unacceptableReasons.add(e.getMessageObject());
        return false;
      }
    }
    return true;
  }
  /**
   * Checks whether any listen address is in use for the given port. The check
   * is performed by binding to each address and port.
   *
   * @param listenAddresses
   *          the listen {@link InetAddress} to test
   * @param listenPort
   *          the listen port to test
   * @param allowReuseAddress
   *          whether addresses can be reused
   * @param configEntryDN
   *          the configuration entry DN
   * @return an error message if at least one of the address is already in use,
   *         null otherwise.
   */
  private LocalizableMessage checkAnyListenAddressInUse(
      Collection<InetAddress> listenAddresses, int listenPort,
      boolean allowReuseAddress, DN configEntryDN)
  {
    for (InetAddress a : listenAddresses)
    {
      try
      {
        if (StaticUtils.isAddressInUse(a, listenPort, allowReuseAddress))
        {
          throw new IOException(ERR_CONNHANDLER_ADDRESS_INUSE.get().toString());
                unacceptableReasons.add(e.getMessageObject());
                return false;
            }
        }
      }
      catch (IOException e)
      {
        logger.traceException(e);
        return ERR_CONNHANDLER_CANNOT_BIND.get("LDAP", configEntryDN, a.getHostAddress(), listenPort,
            getExceptionMessage(e));
      }
        return true;
    }
    return null;
  }
  @Override
  public boolean isConfigurationChangeAcceptable(
      LDAPConnectionHandlerCfg config, List<LocalizableMessage> unacceptableReasons)
  {
    return isConfigurationAcceptable(config, unacceptableReasons);
  }
  @Override
  public void processServerShutdown(LocalizableMessage reason)
  {
    shutdownRequested = true;
  }
  void stopListener()
  {
    if (listener != null)
    {
      listener.close();
      listener = null;
    /**
     * Checks whether any listen address is in use for the given port. The check is performed by binding to each address
     * and port.
     *
     * @param listenAddresses
     *            the listen {@link InetAddress} to test
     * @param listenPort
     *            the listen port to test
     * @param allowReuseAddress
     *            whether addresses can be reused
     * @param configEntryDN
     *            the configuration entry DN
     * @return an error message if at least one of the address is already in use, null otherwise.
     */
    private LocalizableMessage checkAnyListenAddressInUse(Collection<InetAddress> listenAddresses, int listenPort,
            boolean allowReuseAddress, DN configEntryDN) {
        for (InetAddress a : listenAddresses) {
            try {
                if (StaticUtils.isAddressInUse(a, listenPort, allowReuseAddress)) {
                    throw new IOException(ERR_CONNHANDLER_ADDRESS_INUSE.get().toString());
                }
            } catch (IOException e) {
                logger.traceException(e);
                return ERR_CONNHANDLER_CANNOT_BIND.get("LDAP", configEntryDN, a.getHostAddress(), listenPort,
                        getExceptionMessage(e));
            }
        }
        return null;
    }
  }
  private void startListener() throws IOException
  {
    listener = new GrizzlyLDAPListener(
            listenAddresses,
            Options.defaultOptions()
                   .set(LDAPListener.CONNECT_MAX_BACKLOG, backlog)
                   .set(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES, (int) currentConfig.getMaxRequestSize()),
                   new Function<LDAPClientContext,
                                ReactiveHandler<LDAPClientContext,LdapRawMessage, Stream<Response>>,
                                LdapException>() {
    @Override
    public boolean isConfigurationChangeAcceptable(LDAPConnectionHandlerCfg config,
            List<LocalizableMessage> unacceptableReasons) {
        return isConfigurationAcceptable(config, unacceptableReasons);
    }
    @Override
    public void processServerShutdown(LocalizableMessage reason) {
        shutdownRequested = true;
    }
    void stopListener() {
        if (listener != null) {
            listener.close();
            listener = null;
        }
    }
    private void startListener() throws IOException {
        listener = new GrizzlyLDAPListener(
                listenAddresses,
                Options.defaultOptions().set(LDAPListener.CONNECT_MAX_BACKLOG, backlog)
                        .set(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES, (int) currentConfig.getMaxRequestSize()),
                new Function<LDAPClientContext,
                             ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                             LdapException>() {
                    @Override
                    public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>
                        apply(LDAPClientContext clientContext) throws LdapException {
                    public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> apply(
                            LDAPClientContext clientContext) throws LdapException {
                        final LDAPClientConnection2 conn = canAccept(clientContext);
                        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
                            @Override
                            public Single<Stream<Response>> handle(LDAPClientContext context,
                                    LdapRawMessage request) throws Exception {
                            public Single<Stream<Response>> handle(LDAPClientContext context, LdapRawMessage request)
                                    throws Exception {
                                return conn.handle(queueingStrategy, request);
                            }
                        };
                    }
                });
  }
    }
  /**
   * Operates in a loop, accepting new connections and ensuring that requests on
   * those connections are handled properly.
   */
  @Override
  public void run()
  {
    setName(handlerName);
    boolean starting = true;
    setName(handlerName);
    /**
     * Operates in a loop, accepting new connections and ensuring that requests on those connections are handled
     * properly.
     */
    @Override
    public void run() {
        setName(handlerName);
        boolean starting = true;
        setName(handlerName);
    boolean lastIterationFailed = false;
        boolean lastIterationFailed = false;
    while (!shutdownRequested)
    {
      // If this connection handler is not enabled, then just sleep for a bit and check again.
      if (!this.enabled)
      {
        if (listener != null)
        {
          stopListener();
        while (!shutdownRequested) {
            // If this connection handler is not enabled, then just sleep for a bit and check again.
            if (!this.enabled) {
                if (listener != null) {
                    stopListener();
                }
                if (starting) {
                    // This may happen if there was an initialisation error which led to disable the connector.
                    // The main thread is waiting for the connector to listen on its port, which will not occur yet,
                    // so notify here to allow the server startup to complete.
                    synchronized (waitListen) {
                        starting = false;
                        waitListen.notify();
                    }
                }
                StaticUtils.sleep(1000);
                continue;
            }
            if (listener != null) {
                // If already listening, then sleep for a bit and check again.
                StaticUtils.sleep(1000);
                continue;
            }
            try {
                // At this point, the connection Handler either started correctly or failed
                // to start but the start process should be notified and resume its work in any cases.
                synchronized (waitListen) {
                    waitListen.notify();
                }
                // If we have gotten here, then we are about to start listening
                // for the first time since startup or since we were previously disabled.
                // Start the embedded HTTP server
                startListener();
                lastIterationFailed = false;
            } catch (Exception e) {
                // Clean up the messed up HTTP server
                stopListener();
                // Error + alert about the horked config
                logger.traceException(e);
                logger.error(ERR_CONNHANDLER_CANNOT_ACCEPT_CONNECTION, friendlyName, currentConfig.dn(),
                        getExceptionMessage(e));
                if (lastIterationFailed) {
                    // The last time through the accept loop we also encountered a failure.
                    // Rather than enter a potential infinite loop of failures,
                    // disable this acceptor and log an error.
                    LocalizableMessage message = ERR_CONNHANDLER_CONSECUTIVE_ACCEPT_FAILURES.get(friendlyName,
                            currentConfig.dn(), stackTraceToSingleLineString(e));
                    logger.error(message);
                    DirectoryServer.sendAlertNotification(this,
                            ALERT_TYPE_HTTP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES, message);
                    this.enabled = false;
                } else {
                    lastIterationFailed = true;
                }
            }
        }
        if (starting)
        {
          // This may happen if there was an initialisation error which led to disable the connector.
          // The main thread is waiting for the connector to listen on its port, which will not occur yet,
          // so notify here to allow the server startup to complete.
          synchronized (waitListen)
          {
            starting = false;
            waitListen.notify();
          }
        }
        StaticUtils.sleep(1000);
        continue;
      }
      if (listener != null)
      {
        // If already listening, then sleep for a bit and check again.
        StaticUtils.sleep(1000);
        continue;
      }
      try
      {
        // At this point, the connection Handler either started correctly or failed
        // to start but the start process should be notified and resume its work in any cases.
        synchronized (waitListen)
        {
          waitListen.notify();
        }
        // If we have gotten here, then we are about to start listening
        // for the first time since startup or since we were previously disabled.
        // Start the embedded HTTP server
        startListener();
        lastIterationFailed = false;
      }
      catch (Exception e)
      {
        // Clean up the messed up HTTP server
        // Initiate shutdown
        stopListener();
    }
        // Error + alert about the horked config
        logger.traceException(e);
        logger.error(
            ERR_CONNHANDLER_CANNOT_ACCEPT_CONNECTION, friendlyName, currentConfig.dn(), getExceptionMessage(e));
        if (lastIterationFailed)
        {
          // The last time through the accept loop we also encountered a failure.
          // Rather than enter a potential infinite loop of failures,
          // disable this acceptor and log an error.
          LocalizableMessage message = ERR_CONNHANDLER_CONSECUTIVE_ACCEPT_FAILURES.get(
              friendlyName, currentConfig.dn(), stackTraceToSingleLineString(e));
          logger.error(message);
          DirectoryServer.sendAlertNotification(this, ALERT_TYPE_HTTP_CONNECTION_HANDLER_CONSECUTIVE_FAILURES, message);
          this.enabled = false;
    private LDAPClientConnection2 canAccept(LDAPClientContext clientContext) throws LdapException {
        // Check to see if the core server rejected the
        // connection (e.g., already too many connections
        // established).
        final LDAPClientConnection2 clientConnection = new LDAPClientConnection2(this, clientContext, getProtocol(),
                currentConfig.isKeepStats());
        if (clientConnection.getConnectionID() < 0) {
            clientConnection.disconnect(DisconnectReason.ADMIN_LIMIT_EXCEEDED, true,
                    ERR_CONNHANDLER_REJECTED_BY_SERVER.get());
            throw LdapException.newLdapException(ResultCode.ADMIN_LIMIT_EXCEEDED);
        }
        else
        {
          lastIterationFailed = true;
        InetAddress clientAddr = clientConnection.getRemoteAddress();
        // Check to see if the client is on the denied list.
        // If so, then reject it immediately.
        if (!deniedClients.isEmpty() && AddressMask.matchesAny(deniedClients, clientAddr)) {
            clientConnection.disconnect(
                    DisconnectReason.CONNECTION_REJECTED,
                    currentConfig.isSendRejectionNotice(),
                    ERR_CONNHANDLER_DENIED_CLIENT.get(clientConnection.getClientHostPort(),
                            clientConnection.getServerHostPort()));
            throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
        }
      }
    }
        // Check to see if there is an allowed list and if
        // there is whether the client is on that list. If
        // not, then reject the connection.
        if (!allowedClients.isEmpty() && !AddressMask.matchesAny(allowedClients, clientAddr)) {
            clientConnection.disconnect(
                    DisconnectReason.CONNECTION_REJECTED,
                    currentConfig.isSendRejectionNotice(),
                    ERR_CONNHANDLER_DISALLOWED_CLIENT.get(clientConnection.getClientHostPort(),
                            clientConnection.getServerHostPort()));
            throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
        }
    // Initiate shutdown
    stopListener();
  }
  private LDAPClientConnection2 canAccept(LDAPClientContext clientContext) throws LdapException
  {
    // Check to see if the core server rejected the
    // connection (e.g., already too many connections
    // established).
    final LDAPClientConnection2 clientConnection =
            new LDAPClientConnection2(this, clientContext, getProtocol(), currentConfig.isKeepStats());
    if (clientConnection.getConnectionID() < 0)
    {
      clientConnection.disconnect(
          DisconnectReason.ADMIN_LIMIT_EXCEEDED, true, ERR_CONNHANDLER_REJECTED_BY_SERVER.get());
      throw LdapException.newLdapException(ResultCode.ADMIN_LIMIT_EXCEEDED);
    }
    InetAddress clientAddr = clientConnection.getRemoteAddress();
    // Check to see if the client is on the denied list.
    // If so, then reject it immediately.
    if (!deniedClients.isEmpty()
        && AddressMask.matchesAny(deniedClients, clientAddr))
    {
      clientConnection.disconnect(DisconnectReason.CONNECTION_REJECTED,
          currentConfig.isSendRejectionNotice(), ERR_CONNHANDLER_DENIED_CLIENT
              .get(clientConnection.getClientHostPort(), clientConnection
                  .getServerHostPort()));
      throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
    }
    // Check to see if there is an allowed list and if
    // there is whether the client is on that list. If
    // not, then reject the connection.
    if (!allowedClients.isEmpty()
        && !AddressMask.matchesAny(allowedClients, clientAddr))
    {
      clientConnection.disconnect(DisconnectReason.CONNECTION_REJECTED,
          currentConfig.isSendRejectionNotice(),
          ERR_CONNHANDLER_DISALLOWED_CLIENT.get(clientConnection
              .getClientHostPort(), clientConnection.getServerHostPort()));
      throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
    }
    // If we've gotten here, then we'll take the
    // connection so invoke the post-connect plugins and
    // register the client connection with a request
    // handler.
    try
    {
      PluginConfigManager pluginManager = DirectoryServer
          .getPluginConfigManager();
      PluginResult.PostConnect pluginResult = pluginManager
          .invokePostConnectPlugins(clientConnection);
      if (!pluginResult.continueProcessing())
      {
        clientConnection.disconnect(pluginResult.getDisconnectReason(),
            pluginResult.sendDisconnectNotification(),
            pluginResult.getErrorMessage());
        throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
      }
    }
    catch (Exception e)
    {
      logger.traceException(e);
      LocalizableMessage message =
          INFO_CONNHANDLER_UNABLE_TO_REGISTER_CLIENT.get(clientConnection
              .getClientHostPort(), clientConnection.getServerHostPort(),
              getExceptionMessage(e));
      logger.debug(message);
      clientConnection.disconnect(DisconnectReason.SERVER_ERROR,
          currentConfig.isSendRejectionNotice(), message);
      throw LdapException.newLdapException(ResultCode.OPERATIONS_ERROR);
    }
    if (useSSL()) {
        // If we've gotten here, then we'll take the
        // connection so invoke the post-connect plugins and
        // register the client connection with a request
        // handler.
        try {
            clientContext.enableTLS(createSSLEngine());
        } catch (DirectoryException e) {
            throw LdapException.newLdapException(e.getResultCode(), e);
            PluginConfigManager pluginManager = DirectoryServer.getPluginConfigManager();
            PluginResult.PostConnect pluginResult = pluginManager.invokePostConnectPlugins(clientConnection);
            if (!pluginResult.continueProcessing()) {
                clientConnection.disconnect(pluginResult.getDisconnectReason(),
                        pluginResult.sendDisconnectNotification(), pluginResult.getErrorMessage());
                throw LdapException.newLdapException(ResultCode.CONSTRAINT_VIOLATION);
            }
        } catch (Exception e) {
            logger.traceException(e);
            LocalizableMessage message = INFO_CONNHANDLER_UNABLE_TO_REGISTER_CLIENT.get(
                    clientConnection.getClientHostPort(), clientConnection.getServerHostPort(), getExceptionMessage(e));
            logger.debug(message);
            clientConnection.disconnect(DisconnectReason.SERVER_ERROR, currentConfig.isSendRejectionNotice(), message);
            throw LdapException.newLdapException(ResultCode.OPERATIONS_ERROR);
        }
        if (useSSL()) {
            try {
                clientContext.enableTLS(createSSLEngine());
            } catch (DirectoryException e) {
                throw LdapException.newLdapException(e.getResultCode(), e);
            }
        }
        return clientConnection;
    }
    /**
     * Appends a string representation of this connection handler to the provided buffer.
     *
     * @param buffer
     *            The buffer to which the information should be appended.
     */
    @Override
    public void toString(StringBuilder buffer) {
        buffer.append(handlerName);
    }
    /**
     * Indicates whether this connection handler should use SSL to communicate with clients.
     *
     * @return {@code true} if this connection handler should use SSL to communicate with clients, or {@code false} if
     *         not.
     */
    public boolean useSSL() {
        return currentConfig.isUseSSL();
    }
    SSLEngine createSSLEngine() throws DirectoryException {
        return createSSLEngine(currentConfig, sslContext);
    }
    private SSLEngine createSSLEngine(LDAPConnectionHandlerCfg config, SSLContext sslContext)
            throws DirectoryException {
        try {
            SSLEngine sslEngine = sslContext.createSSLEngine();
            sslEngine.setUseClientMode(false);
            final Set<String> protocols = config.getSSLProtocol();
            if (!protocols.isEmpty()) {
                sslEngine.setEnabledProtocols(protocols.toArray(new String[0]));
            }
            final Set<String> ciphers = config.getSSLCipherSuite();
            if (!ciphers.isEmpty()) {
                sslEngine.setEnabledCipherSuites(ciphers.toArray(new String[0]));
            }
            switch (config.getSSLClientAuthPolicy()) {
            case DISABLED:
                sslEngine.setNeedClientAuth(false);
                sslEngine.setWantClientAuth(false);
                break;
            case REQUIRED:
                sslEngine.setWantClientAuth(true);
                sslEngine.setNeedClientAuth(true);
                break;
            case OPTIONAL:
            default:
                sslEngine.setNeedClientAuth(false);
                sslEngine.setWantClientAuth(true);
                break;
            }
            return sslEngine;
        } catch (Exception e) {
            logger.traceException(e);
            ResultCode resCode = DirectoryServer.getServerErrorResultCode();
            LocalizableMessage message = ERR_CONNHANDLER_SSL_CANNOT_INITIALIZE.get(getExceptionMessage(e));
            throw new DirectoryException(resCode, message, e);
        }
    }
    return clientConnection;
  }
  /**
   * Appends a string representation of this connection handler to the provided
   * buffer.
   *
   * @param buffer
   *          The buffer to which the information should be appended.
   */
  @Override
  public void toString(StringBuilder buffer)
  {
    buffer.append(handlerName);
  }
  /**
   * Indicates whether this connection handler should use SSL to communicate
   * with clients.
   *
   * @return {@code true} if this connection handler should use SSL to
   *         communicate with clients, or {@code false} if not.
   */
  public boolean useSSL()
  {
    return currentConfig.isUseSSL();
  }
  SSLEngine createSSLEngine() throws DirectoryException {
      return createSSLEngine(currentConfig, sslContext);
  }
  private SSLEngine createSSLEngine(LDAPConnectionHandlerCfg config, SSLContext sslContext) throws DirectoryException
  {
    try
    {
      SSLEngine sslEngine = sslContext.createSSLEngine();
      sslEngine.setUseClientMode(false);
      final Set<String> protocols = config.getSSLProtocol();
      if (!protocols.isEmpty())
      {
        sslEngine.setEnabledProtocols(protocols.toArray(new String[0]));
      }
      final Set<String> ciphers = config.getSSLCipherSuite();
      if (!ciphers.isEmpty())
      {
        sslEngine.setEnabledCipherSuites(ciphers.toArray(new String[0]));
      }
      switch (config.getSSLClientAuthPolicy())
      {
      case DISABLED:
        sslEngine.setNeedClientAuth(false);
        sslEngine.setWantClientAuth(false);
        break;
      case REQUIRED:
        sslEngine.setWantClientAuth(true);
        sslEngine.setNeedClientAuth(true);
        break;
      case OPTIONAL:
      default:
        sslEngine.setNeedClientAuth(false);
        sslEngine.setWantClientAuth(true);
        break;
      }
      return sslEngine;
    }
    catch (Exception e)
    {
      logger.traceException(e);
      ResultCode resCode = DirectoryServer.getServerErrorResultCode();
      LocalizableMessage message = ERR_CONNHANDLER_SSL_CANNOT_INITIALIZE
          .get(getExceptionMessage(e));
      throw new DirectoryException(resCode, message, e);
    }
  }
  private void disableAndWarnIfUseSSL(LDAPConnectionHandlerCfg config)
  {
    if (config.isUseSSL())
    {
      logger.warn(INFO_DISABLE_CONNECTION, friendlyName);
      enabled = false;
    }
  }
  private SSLContext createSSLContext(LDAPConnectionHandlerCfg config)
      throws DirectoryException
  {
    try
    {
      DN keyMgrDN = config.getKeyManagerProviderDN();
      KeyManagerProvider<?> keyManagerProvider = DirectoryServer
          .getKeyManagerProvider(keyMgrDN);
      if (keyManagerProvider == null)
      {
        logger.error(ERR_NULL_KEY_PROVIDER_MANAGER, keyMgrDN, friendlyName);
        disableAndWarnIfUseSSL(config);
        keyManagerProvider = new NullKeyManagerProvider();
        // The SSL connection is unusable without a key manager provider
      }
      else if (! keyManagerProvider.containsAtLeastOneKey())
      {
        logger.error(ERR_INVALID_KEYSTORE, friendlyName);
        disableAndWarnIfUseSSL(config);
      }
      final SortedSet<String> aliases = new TreeSet<>(config.getSSLCertNickname());
      final KeyManager[] keyManagers;
      if (aliases.isEmpty())
      {
        keyManagers = keyManagerProvider.getKeyManagers();
      }
      else
      {
        final Iterator<String> it = aliases.iterator();
        while (it.hasNext())
        {
          if (!keyManagerProvider.containsKeyWithAlias(it.next()))
          {
            logger.error(ERR_KEYSTORE_DOES_NOT_CONTAIN_ALIAS, aliases, friendlyName);
            it.remove();
          }
    private void disableAndWarnIfUseSSL(LDAPConnectionHandlerCfg config) {
        if (config.isUseSSL()) {
            logger.warn(INFO_DISABLE_CONNECTION, friendlyName);
            enabled = false;
        }
    }
        if (aliases.isEmpty())
        {
          disableAndWarnIfUseSSL(config);
    private SSLContext createSSLContext(LDAPConnectionHandlerCfg config) throws DirectoryException {
        try {
            DN keyMgrDN = config.getKeyManagerProviderDN();
            KeyManagerProvider<?> keyManagerProvider = DirectoryServer.getKeyManagerProvider(keyMgrDN);
            if (keyManagerProvider == null) {
                logger.error(ERR_NULL_KEY_PROVIDER_MANAGER, keyMgrDN, friendlyName);
                disableAndWarnIfUseSSL(config);
                keyManagerProvider = new NullKeyManagerProvider();
                // The SSL connection is unusable without a key manager provider
            } else if (!keyManagerProvider.containsAtLeastOneKey()) {
                logger.error(ERR_INVALID_KEYSTORE, friendlyName);
                disableAndWarnIfUseSSL(config);
            }
            final SortedSet<String> aliases = new TreeSet<>(config.getSSLCertNickname());
            final KeyManager[] keyManagers;
            if (aliases.isEmpty()) {
                keyManagers = keyManagerProvider.getKeyManagers();
            } else {
                final Iterator<String> it = aliases.iterator();
                while (it.hasNext()) {
                    if (!keyManagerProvider.containsKeyWithAlias(it.next())) {
                        logger.error(ERR_KEYSTORE_DOES_NOT_CONTAIN_ALIAS, aliases, friendlyName);
                        it.remove();
                    }
                }
                if (aliases.isEmpty()) {
                    disableAndWarnIfUseSSL(config);
                }
                keyManagers = SelectableCertificateKeyManager.wrap(keyManagerProvider.getKeyManagers(), aliases,
                        friendlyName);
            }
            DN trustMgrDN = config.getTrustManagerProviderDN();
            TrustManagerProvider<?> trustManagerProvider = DirectoryServer.getTrustManagerProvider(trustMgrDN);
            if (trustManagerProvider == null) {
                trustManagerProvider = new NullTrustManagerProvider();
            }
            SSLContext sslContext = SSLContext.getInstance(SSL_CONTEXT_INSTANCE_NAME);
            sslContext.init(keyManagers, trustManagerProvider.getTrustManagers(), null);
            return sslContext;
        } catch (Exception e) {
            logger.traceException(e);
            ResultCode resCode = DirectoryServer.getServerErrorResultCode();
            LocalizableMessage message = ERR_CONNHANDLER_SSL_CANNOT_INITIALIZE.get(getExceptionMessage(e));
            throw new DirectoryException(resCode, message, e);
        }
        keyManagers = SelectableCertificateKeyManager.wrap(keyManagerProvider.getKeyManagers(), aliases, friendlyName);
      }
      DN trustMgrDN = config.getTrustManagerProviderDN();
      TrustManagerProvider<?> trustManagerProvider = DirectoryServer
          .getTrustManagerProvider(trustMgrDN);
      if (trustManagerProvider == null)
      {
        trustManagerProvider = new NullTrustManagerProvider();
      }
      SSLContext sslContext = SSLContext.getInstance(SSL_CONTEXT_INSTANCE_NAME);
      sslContext.init(keyManagers, trustManagerProvider.getTrustManagers(),
          null);
      return sslContext;
    }
    catch (Exception e)
    {
      logger.traceException(e);
      ResultCode resCode = DirectoryServer.getServerErrorResultCode();
      LocalizableMessage message = ERR_CONNHANDLER_SSL_CANNOT_INITIALIZE
          .get(getExceptionMessage(e));
      throw new DirectoryException(resCode, message, e);
    }
  }
  /**
   * Enqueue a connection finalizer which will be invoked after a short delay.
   *
   * @param r
   *          The connection finalizer runnable.
   */
  void registerConnectionFinalizer(Runnable r)
  {
    synchronized (connectionFinalizerLock)
    {
      if (connectionFinalizer != null)
      {
        connectionFinalizerPendingJobQueue.add(r);
      }
      else
      {
        // Already finalized - invoked immediately.
        r.run();
      }
    /**
     * Enqueue a connection finalizer which will be invoked after a short delay.
     *
     * @param r
     *            The connection finalizer runnable.
     */
    void registerConnectionFinalizer(Runnable r) {
        synchronized (connectionFinalizerLock) {
            if (connectionFinalizer != null) {
                connectionFinalizerPendingJobQueue.add(r);
            } else {
                // Already finalized - invoked immediately.
                r.run();
            }
        }
    }
  }
}
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/ProxyToHandler.java
File was deleted
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Route.java
File was deleted
opendj-server-legacy/src/main/java/org/opends/server/core/LdapEndpointConfigManager.java
File was deleted