mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
12.17.2013 3945d5b6c83388fbdeb0cf0c85052563badff04f
Fix OPENDJ-1247: Client side timeouts do not cancel bind or startTLS requests properly

* fail the connection when a bind or startTLS request times out
* ensure that abandon requests are sent for other types of operation
* fix a bug which meant that the timeout checker could fail to enforce timeouts for the first LDAP connection
* minor improvements: only register connections which have a non-zero timeout; remove duplicate requestID field from LDAP future implementation
* added unit tests. These tests are a bit too functional at the moment and will be split up in a subsequent change.
1 files added
8 files modified
741 ■■■■ changed files
opendj-core/src/main/java/org/forgerock/opendj/ldap/TimeoutChecker.java 82 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/AbstractLDAPFutureResultImpl.java 64 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LDAPBindFutureResultImpl.java 9 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LDAPExtendedFutureResultImpl.java 7 ●●●● patch | view | raw | blame | history
opendj-grizzly/pom.xml 17 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnection.java 159 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java 5 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/resources/com/forgerock/opendj/grizzly/grizzly.properties 33 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java 365 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/TimeoutChecker.java
@@ -24,7 +24,6 @@
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.DEFAULT_LOG;
@@ -40,8 +39,6 @@
 * All listeners registered with the {@code #addListener()} method are called
 * back with {@code TimeoutEventListener#handleTimeout()} to be able to handle
 * the timeout.
 * <p>
 *
 */
public final class TimeoutChecker {
    /**
@@ -63,12 +60,11 @@
    /**
     * Condition variable used for coordinating the timeout thread.
     */
    private final Object available = new Object();
    private final Object stateLock = new Object();
    /**
     * The listener set must be safe from CMEs.
     * For example, if the listener is a connection, expiring requests can
     * cause the connection to be closed.
     * The listener set must be safe from CMEs. For example, if the listener is
     * a connection, expiring requests can cause the connection to be closed.
     */
    private final Set<TimeoutEventListener> listeners =
            newSetFromMap(new ConcurrentHashMap<TimeoutEventListener, Boolean>());
@@ -78,34 +74,53 @@
     */
    private volatile boolean shutdownRequested = false;
    /**
     * Contains the minimum delay for listeners which were added while the
     * timeout check was not sleeping (i.e. while it was processing listeners).
     */
    private volatile long pendingListenerMinDelay = Long.MAX_VALUE;
    private TimeoutChecker() {
        final Thread checkerThread = new Thread("OpenDJ LDAP SDK Timeout Checker") {
            @Override
            public void run() {
                DEFAULT_LOG.debug("Timeout Checker Starting");
                while (!shutdownRequested) {
                    /*
                     * New listeners may be added during iteration and may not
                     * be included in the computation of the new delay. This
                     * could potentially result in the timeout checker waiting
                     * longer than it should, or even forever (e.g. if the new
                     * listener is the first).
                     */
                    final long currentTime = System.currentTimeMillis();
                    long delay = 0;
                    long delay = Long.MAX_VALUE;
                    pendingListenerMinDelay = Long.MAX_VALUE;
                    for (final TimeoutEventListener listener : listeners) {
                        DEFAULT_LOG.trace("Checking connection {} delay = {}", listener, delay);
                        // May update the connections set.
                        final long newDelay = listener.handleTimeout(currentTime);
                        if (newDelay > 0) {
                            if (delay > 0) {
                                delay = Math.min(newDelay, delay);
                            } else {
                                delay = newDelay;
                            }
                            delay = Math.min(newDelay, delay);
                        }
                    }
                    try {
                        synchronized (available) {
                            if (delay <= 0) {
                                available.wait();
                        synchronized (stateLock) {
                            // Include any pending listener delays.
                            delay = Math.min(pendingListenerMinDelay, delay);
                            if (shutdownRequested) {
                                // Stop immediately.
                                break;
                            } else if (delay <= 0) {
                                /*
                                 * If there is at least one connection then the
                                 * delay should be > 0.
                                 */
                                stateLock.wait();
                            } else {
                                available.wait(delay);
                                stateLock.wait(delay);
                            }
                        }
                    } catch (final InterruptedException e) {
@@ -120,23 +135,31 @@
    }
    /**
     * Add a listener to check.
     * Registers a timeout event listener for timeout notification.
     *
     * @param listener
     *            listener to check for timeout event
     *            The timeout event listener.
     */
    public void addListener(final TimeoutEventListener listener) {
        listeners.add(listener);
        if (listener.getTimeout() > 0) {
            signal();
        /*
         * Only add the listener if it has a non-zero timeout. This assumes that
         * the timeout is fixed.
         */
        final long timeout = listener.getTimeout();
        if (timeout > 0) {
            listeners.add(listener);
            synchronized (stateLock) {
                pendingListenerMinDelay = Math.min(pendingListenerMinDelay, timeout);
                stateLock.notifyAll();
            }
        }
    }
    /**
     * Stop checking a listener.
     * Deregisters a timeout event listener for timeout notification.
     *
     * @param listener
     *            listener that was previously added to check for timeout event
     *            The timeout event listener.
     */
    public void removeListener(final TimeoutEventListener listener) {
        listeners.remove(listener);
@@ -144,14 +167,9 @@
    }
    private void shutdown() {
        shutdownRequested = true;
        signal();
    }
    // Wakes the timeout checker if it is sleeping.
    private void signal() {
        synchronized (available) {
            available.notifyAll();
        synchronized (stateLock) {
            shutdownRequested = true;
            stateLock.notifyAll();
        }
    }
}
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/AbstractLDAPFutureResultImpl.java
@@ -44,11 +44,10 @@
 * @param <S>
 *            The type of result returned by this future.
 */
public abstract class AbstractLDAPFutureResultImpl<S extends Result>
        extends AsynchronousFutureResult<S, ResultHandler<? super S>>
        implements IntermediateResponseHandler {
public abstract class AbstractLDAPFutureResultImpl<S extends Result> extends
        AsynchronousFutureResult<S, ResultHandler<? super S>> implements
        IntermediateResponseHandler {
    private final Connection connection;
    private final int requestID;
    private IntermediateResponseHandler intermediateResponseHandler;
    private volatile long timestamp;
@@ -66,24 +65,15 @@
     *            the connection to directory server
     */
    protected AbstractLDAPFutureResultImpl(final int requestID,
        final ResultHandler<? super S> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler,
        final Connection connection) {
        super(resultHandler);
        this.requestID = requestID;
            final ResultHandler<? super S> resultHandler,
            final IntermediateResponseHandler intermediateResponseHandler,
            final Connection connection) {
        super(resultHandler, requestID);
        this.connection = connection;
        this.intermediateResponseHandler = intermediateResponseHandler;
        this.timestamp = System.currentTimeMillis();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final int getRequestID() {
        return requestID;
    }
    /** {@inheritDoc} */
    @Override
    public final boolean handleIntermediateResponse(final IntermediateResponse response) {
@@ -107,14 +97,41 @@
     */
    @Override
    protected final ErrorResultException handleCancelRequest(final boolean mayInterruptIfRunning) {
        connection.abandonAsync(Requests.newAbandonRequest(requestID));
        /*
         * This will abandon the request, but will also recursively cancel this
         * future. There is no risk of an infinite loop because the state of
         * this future has already been changed.
         */
        connection.abandonAsync(Requests.newAbandonRequest(getRequestID()));
        return null;
    }
    @Override
    protected final boolean isCancelable() {
        /*
         * No other operations can be performed while a bind or startTLS
         * operations is active. Therefore it is not possible to cancel bind or
         * startTLS requests, since doing so will leave the connection in a
         * state which prevents other operations from being performed.
         */
        return !isBindOrStartTLS();
    }
    /**
     * Returns {@code true} if this future represents the result of a bind or
     * StartTLS request. The default implementation is to return {@code false}.
     *
     * @return {@code true} if this future represents the result of a bind or
     *         StartTLS request.
     */
    public boolean isBindOrStartTLS() {
        return false;
    }
    @Override
    protected void toString(final StringBuilder sb) {
        sb.append(" requestID = ");
        sb.append(requestID);
        sb.append(getRequestID());
        sb.append(" timestamp = ");
        sb.append(timestamp);
        super.toString(sb);
@@ -123,7 +140,8 @@
    /**
     * Sets the result associated to this future as an error result.
     *
     * @param result result of an operation
     * @param result
     *            result of an operation
     */
    public final void adaptErrorResult(final Result result) {
        final S errorResult =
@@ -152,12 +170,14 @@
     *            cause of the error
     * @return the error result
     */
    protected abstract S newErrorResult(ResultCode resultCode, String diagnosticMessage, Throwable cause);
    protected abstract S newErrorResult(ResultCode resultCode, String diagnosticMessage,
            Throwable cause);
    /**
     * Sets the result associated to this future.
     *
     * @param result the result of operation
     * @param result
     *            the result of operation
     */
    public final void setResultOrError(final S result) {
        if (result.getResultCode().isExceptional()) {
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LDAPBindFutureResultImpl.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap.spi;
@@ -64,12 +64,9 @@
        this.bindClient = bindClient;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    protected boolean isCancelable() {
        return false;
    public boolean isBindOrStartTLS() {
        return true;
    }
    @Override
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LDAPExtendedFutureResultImpl.java
@@ -81,12 +81,9 @@
        return sb.toString();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    protected boolean isCancelable() {
        return !request.getOID().equals(StartTLSExtendedRequest.OID);
    public boolean isBindOrStartTLS() {
        return request.getOID().equals(StartTLSExtendedRequest.OID);
    }
    /**
opendj-grizzly/pom.xml
@@ -79,6 +79,23 @@
  <build>
    <plugins>
      <plugin>
        <groupId>org.forgerock.commons</groupId>
        <artifactId>i18n-maven-plugin</artifactId>
        <executions>
          <execution>
            <phase>generate-sources</phase>
            <goals>
              <goal>generate-messages</goal>
            </goals>
            <configuration>
              <messageFiles>
                <messageFile>com/forgerock/opendj/grizzly/grizzly.properties</messageFile>
              </messageFiles>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.felix</groupId>
        <artifactId>maven-bundle-plugin</artifactId>
        <extensions>true</extensions>
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnection.java
@@ -27,6 +27,9 @@
package org.forgerock.opendj.grizzly;
import static com.forgerock.opendj.grizzly.GrizzlyMessages.LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT;
import static com.forgerock.opendj.grizzly.GrizzlyMessages.LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT;
import static com.forgerock.opendj.grizzly.GrizzlyMessages.LDAP_CONNECTION_REQUEST_TIMEOUT;
import static com.forgerock.opendj.util.StaticUtils.DEFAULT_LOG;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
@@ -66,7 +69,6 @@
import org.forgerock.opendj.ldap.requests.GenericBindRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
@@ -142,40 +144,64 @@
    @Override
    public FutureResult<Void> abandonAsync(final AbandonRequest request) {
        final AbstractLDAPFutureResultImpl<?> pendingRequest;
        final int messageID = nextMsgID.getAndIncrement();
        /*
         * Need to be careful here since both abandonAsync and Future.cancel can
         * be called separately by the client application. Therefore
         * future.cancel() should abandon the request, and abandonAsync should
         * cancel the future. In addition, bind or StartTLS requests cannot be
         * abandoned.
         */
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                // Remove the future associated with the request to be abandoned.
                pendingRequest = pendingRequests.remove(request.getRequestID());
            }
            if (pendingRequest == null) {
                /*
                 * There has never been a request with the specified message ID
                 * or the response has already been received and handled. We can
                 * ignore this abandon request.
                 * If there is a bind or startTLS in progress then it must be
                 * this request which is being abandoned. The following check
                 * will prevent it from happening.
                 */
                // Message ID will be -1 since no request was sent.
                return new CompletedFutureResult<Void>((Void) null);
            }
            pendingRequest.cancel(false);
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter();
                try {
                    writer.writeAbandonRequest(messageID, request);
                    connection.write(writer.getASN1Writer().getBuffer(), null);
                    return new CompletedFutureResult<Void>((Void) null, messageID);
                } finally {
                    GrizzlyUtils.recycleWriter(writer);
                }
            } catch (final IOException e) {
                throw adaptRequestIOException(e);
                checkBindOrStartTLSInProgress();
            }
        } catch (final ErrorResultException e) {
            return new CompletedFutureResult<Void>(e, messageID);
            return new CompletedFutureResult<Void>(e);
        }
        // Remove the future associated with the request to be abandoned.
        final AbstractLDAPFutureResultImpl<?> pendingRequest =
                pendingRequests.remove(request.getRequestID());
        if (pendingRequest == null) {
            /*
             * There has never been a request with the specified message ID or
             * the response has already been received and handled. We can ignore
             * this abandon request.
             */
            return new CompletedFutureResult<Void>((Void) null);
        }
        /*
         * This will cancel the future, but will also recursively invoke this
         * method. Since the pending request has been removed, there is no risk
         * of an infinite loop.
         */
        pendingRequest.cancel(false);
        /*
         * FIXME: there's a potential race condition here if a bind or startTLS
         * is initiated just after we removed the pending request.
         */
        return sendAbandonRequest(request);
    }
    private FutureResult<Void> sendAbandonRequest(final AbandonRequest request) {
        final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter();
        try {
            final int messageID = nextMsgID.getAndIncrement();
            writer.writeAbandonRequest(messageID, request);
            connection.write(writer.getASN1Writer().getBuffer(), null);
            return new CompletedFutureResult<Void>((Void) null, messageID);
        } catch (final IOException e) {
            return new CompletedFutureResult<Void>(adaptRequestIOException(e));
        } finally {
            GrizzlyUtils.recycleWriter(writer);
        }
    }
@@ -563,28 +589,63 @@
    @Override
    public long handleTimeout(final long currentTime) {
        final long timeout = getTimeout();
        final long timeout = factory.getLDAPOptions().getTimeout(TimeUnit.MILLISECONDS);
        if (timeout <= 0) {
            return 0;
        }
        long delay = timeout;
        if (timeout > 0) {
            for (final int requestID : pendingRequests.keySet()) {
                final AbstractLDAPFutureResultImpl<?> future = pendingRequests.get(requestID);
                if (future != null && future.checkForTimeout()) {
                    final long diff = (future.getTimestamp() + timeout) - currentTime;
                    if (diff <= 0) {
                        if (pendingRequests.remove(requestID) != null) {
                            DEFAULT_LOG.debug("Cancelling expired future result: {}", future);
                            final Result result = Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT);
                            future.adaptErrorResult(result);
                            abandonAsync(Requests.newAbandonRequest(future.getRequestID()));
                        } else {
                            DEFAULT_LOG.debug(
                                    "Pending request {} has already been removed, not cancelling future result",
                                    requestID);
                        }
                    } else {
                        delay = Math.min(delay, diff);
                    }
                }
        for (final AbstractLDAPFutureResultImpl<?> future : pendingRequests.values()) {
            if (future == null || !future.checkForTimeout()) {
                continue;
            }
            final long diff = (future.getTimestamp() + timeout) - currentTime;
            if (diff > 0) {
                // Will expire in diff milliseconds.
                delay = Math.min(delay, diff);
            } else if (pendingRequests.remove(future.getRequestID()) == null) {
                // Result arrived at the same time.
                continue;
            } else if (future.isBindOrStartTLS()) {
                /*
                 * No other operations can be performed while a bind or StartTLS
                 * request is active, so we cannot time out the request. We
                 * therefore have a choice: either ignore timeouts for these
                 * operations, or enforce them but doing so requires
                 * invalidating the connection. We'll do the latter, since
                 * ignoring timeouts could cause the application to hang.
                 */
                DEFAULT_LOG.debug("Failing bind or StartTLS request due to timeout "
                        + "(connection will be invalidated): " + future);
                final Result result =
                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                                LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT.get(timeout)
                                        .toString());
                future.adaptErrorResult(result);
                // Fail the connection.
                final Result errorResult =
                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                                LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT.get(timeout)
                                        .toString());
                connectionErrorOccurred(errorResult);
            } else {
                DEFAULT_LOG.debug("Failing request due to timeout: " + future);
                final Result result =
                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                                LDAP_CONNECTION_REQUEST_TIMEOUT.get(timeout).toString());
                future.adaptErrorResult(result);
                /*
                 * FIXME: there's a potential race condition here if a bind or
                 * startTLS is initiated just after we check the boolean. It
                 * seems potentially even more dangerous to send the abandon
                 * request while holding the state lock, since a blocking write
                 * could hang the application.
                 */
//              if (!bindOrStartTLSInProgress.get()) {
//                  sendAbandonRequest(newAbandonRequest(future.getRequestID()));
//              }
            }
        }
        return delay;
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java
@@ -34,6 +34,7 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -166,7 +167,9 @@
            connection.configureBlocking(true);
            final GrizzlyLDAPConnection ldapConnection =
                    new GrizzlyLDAPConnection(connection, GrizzlyLDAPConnectionFactory.this);
            timeoutChecker.get().addListener(ldapConnection);
            if (options.getTimeout(TimeUnit.MILLISECONDS) > 0) {
                timeoutChecker.get().addListener(ldapConnection);
            }
            clientFilter.registerConnection(connection, ldapConnection);
            return ldapConnection;
        }
opendj-grizzly/src/main/resources/com/forgerock/opendj/grizzly/grizzly.properties
New file
@@ -0,0 +1,33 @@
#
# CDDL HEADER START
#
# The contents of this file are subject to the terms of the
# Common Development and Distribution License, Version 1.0 only
# (the "License").  You may not use this file except in compliance
# with the License.
#
# You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
# or http://forgerock.org/license/CDDLv1.0.html.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# When distributing Covered Code, include this CDDL HEADER in each
# file and include the License file at legal-notices/CDDLv1_0.txt.
# If applicable, add the following below this CDDL HEADER, with the
# fields enclosed by brackets "[]" replaced with your own identifying
# information:
#      Portions Copyright [yyyy] [name of copyright owner]
#
# CDDL HEADER END
#
#
#      Copyright 2013 ForgeRock AS.
#
LDAP_CONNECTION_REQUEST_TIMEOUT=The request has failed because no response \
 was received from the server within the %d ms timeout
LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT=The bind or StartTLS request \
 has failed because no response was received from the server within the %d ms \
 timeout. The LDAP connection is now in an invalid state and can no longer be used
LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT=The LDAP connection has \
 failed because no bind or StartTLS response was received from the server \
 within the %d ms timeout
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java
@@ -35,27 +35,239 @@
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.opendj.ldap.Connection;
import org.forgerock.opendj.ldap.ConnectionEventListener;
import org.forgerock.opendj.ldap.ConnectionFactory;
import org.forgerock.opendj.ldap.Connections;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ErrorResultException;
import org.forgerock.opendj.ldap.FutureResult;
import org.forgerock.opendj.ldap.IntermediateResponseHandler;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPConnectionFactory;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LDAPOptions;
import org.forgerock.opendj.ldap.MockConnectionEventListener;
import org.forgerock.opendj.ldap.ProviderNotFoundException;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.ResultHandler;
import org.forgerock.opendj.ldap.SdkTestCase;
import org.forgerock.opendj.ldap.SearchResultHandler;
import org.forgerock.opendj.ldap.ServerConnection;
import org.forgerock.opendj.ldap.ServerConnectionFactory;
import org.forgerock.opendj.ldap.TimeoutResultException;
import org.testng.annotations.Test;
import static org.fest.assertions.Fail.fail;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import java.util.concurrent.TimeoutException;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.Stubber;
import org.testng.annotations.AfterClass;
/**
 * Tests the {@link LDAPConnectionFactory} class.
 */
@SuppressWarnings("javadoc")
@SuppressWarnings({ "javadoc", "unchecked" })
public class GrizzlyLDAPConnectionFactoryTestCase extends SdkTestCase {
    // Manual testing has gone up to 10000 iterations.
    private static final int ITERATIONS = 100;
    // Test timeout for tests which need to wait for network events.
    private static final long TEST_TIMEOUT = 30L;
    /*
     * It is usually quite a bad code smell to share state between unit tests.
     * However, in this case we want to re-use the same factories and listeners
     * in order to avoid shutting down and restarting the transport for each
     * iteration.
     */
    private final Semaphore abandonLatch = new Semaphore(0);
    private final Semaphore bindLatch = new Semaphore(0);
    private final Semaphore closeLatch = new Semaphore(0);
    private final Semaphore connectLatch = new Semaphore(0);
    private final Semaphore searchLatch = new Semaphore(0);
    private final AtomicReference<LDAPClientContext> context =
            new AtomicReference<LDAPClientContext>();
    private final LDAPListener server = createServer();
    private final ConnectionFactory factory = new LDAPConnectionFactory(server.getSocketAddress(),
            new LDAPOptions().setTimeout(1, TimeUnit.MILLISECONDS));
    private final ConnectionFactory pool = Connections.newFixedConnectionPool(factory, 10);
    private volatile ServerConnection<Integer> serverConnection;
    @AfterClass
    public void tearDown() {
        pool.close();
        factory.close();
        server.close();
    }
    /**
     * Unit test for OPENDJ-1247: a locally timed out bind request will leave a
     * connection in an invalid state since a bind (or startTLS) is in progress
     * and no other operations can be performed. Therefore, a timeout should
     * cause the connection to become invalid and an appropriate connection
     * event sent. In addition, no abandon request should be sent.
     */
    @Test
    public void testClientSideTimeoutForBindRequest() throws Exception {
        resetState();
        registerBindEvent();
        registerCloseEvent();
        for (int i = 0; i < ITERATIONS; i++) {
            final Connection connection = factory.getConnection();
            try {
                waitForConnect();
                final MockConnectionEventListener listener = new MockConnectionEventListener();
                connection.addConnectionEventListener(listener);
                final ResultHandler<BindResult> handler = mock(ResultHandler.class);
                final FutureResult<BindResult> future =
                        connection.bindAsync(newSimpleBindRequest(), null, handler);
                waitForBind();
                // Wait for the request to timeout.
                try {
                    future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
                    fail("The bind request succeeded unexpectedly");
                } catch (TimeoutResultException e) {
                    verifyResultCodeIsClientSideTimeout(e);
                    verify(handler).handleErrorResult(same(e));
                    /*
                     * The connection should no longer be valid, the event
                     * listener should have been notified, but no abandon should
                     * have been sent.
                     */
                    listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
                    assertThat(connection.isValid()).isFalse();
                    verifyResultCodeIsClientSideTimeout(listener.getError());
                    connection.close();
                    waitForClose();
                    verifyNoAbandonSent();
                }
            } finally {
                connection.close();
            }
        }
    }
    /**
     * Unit test for OPENDJ-1247: as per previous test, except this time verify
     * that the connection failure removes the connection from a connection
     * pool.
     */
    @Test
    public void testClientSideTimeoutForBindRequestInConnectionPool() throws Exception {
        resetState();
        registerBindEvent();
        registerCloseEvent();
        for (int i = 0; i < ITERATIONS; i++) {
            final Connection connection = pool.getConnection();
            try {
                waitForConnect();
                final MockConnectionEventListener listener = new MockConnectionEventListener();
                connection.addConnectionEventListener(listener);
                // Now bind with timeout.
                final ResultHandler<BindResult> handler = mock(ResultHandler.class);
                final FutureResult<BindResult> future =
                        connection.bindAsync(newSimpleBindRequest(), null, handler);
                waitForBind();
                // Wait for the request to timeout.
                try {
                    future.get(5, TimeUnit.SECONDS);
                    fail("The bind request succeeded unexpectedly");
                } catch (TimeoutException e) {
                    fail("The bind request future get timed out");
                } catch (TimeoutResultException e) {
                    verifyResultCodeIsClientSideTimeout(e);
                    verify(handler).handleErrorResult(same(e));
                    /*
                     * The connection should no longer be valid, the event
                     * listener should have been notified, but no abandon should
                     * have been sent.
                     */
                    listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
                    assertThat(connection.isValid()).isFalse();
                    verifyResultCodeIsClientSideTimeout(listener.getError());
                    connection.close();
                    waitForClose();
                    verifyNoAbandonSent();
                }
            } finally {
                connection.close();
            }
        }
    }
    /**
     * Unit test for OPENDJ-1247: a locally timed out request which is not a
     * bind or startTLS should result in a client side timeout error, but the
     * connection should remain valid. In addition, no abandon request should be
     * sent.
     */
    @Test
    public void testClientSideTimeoutForSearchRequest() throws Exception {
        resetState();
        registerSearchEvent();
        registerAbandonEvent();
        for (int i = 0; i < ITERATIONS; i++) {
            final Connection connection = factory.getConnection();
            try {
                waitForConnect();
                final ConnectionEventListener listener = mock(ConnectionEventListener.class);
                connection.addConnectionEventListener(listener);
                final ResultHandler<SearchResultEntry> handler = mock(ResultHandler.class);
                final FutureResult<SearchResultEntry> future =
                        connection.readEntryAsync(DN.valueOf("cn=test"), null, handler);
                waitForSearch();
                // Wait for the request to timeout.
                try {
                    future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
                    fail("The search request succeeded unexpectedly");
                } catch (TimeoutResultException e) {
                    verifyResultCodeIsClientSideTimeout(e);
                    verify(handler).handleErrorResult(same(e));
                    // The connection should still be valid.
                    assertThat(connection.isValid()).isTrue();
                    verifyZeroInteractions(listener);
                    /*
                     * FIXME: The search should have been abandoned (see comment
                     * in LDAPConnection for explanation).
                     */
                    // waitForAbandon();
                }
            } finally {
                connection.close();
            }
        }
    }
    @Test
    public void testCreateLDAPConnectionFactory() throws Exception {
        // test no exception is thrown, which means transport provider is correctly loaded
@@ -63,15 +275,6 @@
        factory.close();
    }
    @Test
    public void testCreateLDAPConnectionFactoryWithCustomClassLoader() throws Exception {
        // test no exception is thrown, which means transport provider is correctly loaded
        LDAPOptions options = new LDAPOptions().
                setProviderClassLoader(Thread.currentThread().getContextClassLoader());
        LDAPConnectionFactory factory = new LDAPConnectionFactory(findFreeSocketAddress(), options);
        factory.close();
    }
    @Test(expectedExceptions = { ProviderNotFoundException.class },
            expectedExceptionsMessageRegExp = "^The requested provider 'unknown' .*")
    public void testCreateLDAPConnectionFactoryFailureProviderNotFound() throws Exception {
@@ -80,53 +283,133 @@
        factory.close();
    }
    @Test
    public void testCreateLDAPConnectionFactoryWithCustomClassLoader() throws Exception {
        // test no exception is thrown, which means transport provider is correctly loaded
        LDAPOptions options =
                new LDAPOptions().setProviderClassLoader(Thread.currentThread()
                        .getContextClassLoader());
        LDAPConnectionFactory factory = new LDAPConnectionFactory(findFreeSocketAddress(), options);
        factory.close();
    }
    /**
     * This unit test exposes the bug raised in issue OPENDJ-1156: NPE in
     * ReferenceCountedObject after shutting down directory.
     */
    @Test
    public void testResourceManagement() throws Exception {
        final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
        final Semaphore latch = new Semaphore(0);
        final LDAPListener server = createServer(latch, context);
        final ConnectionFactory factory = new LDAPConnectionFactory(server.getSocketAddress());
        try {
            for (int i = 0; i < 100; i++) {
                // Connect to the server.
                final Connection connection = factory.getConnection();
                try {
                    // Wait for the server to accept the connection.
                    assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
        resetState();
                    final MockConnectionEventListener listener = new MockConnectionEventListener();
                    connection.addConnectionEventListener(listener);
        for (int i = 0; i < ITERATIONS; i++) {
            final Connection connection = factory.getConnection();
            try {
                waitForConnect();
                final MockConnectionEventListener listener = new MockConnectionEventListener();
                connection.addConnectionEventListener(listener);
                    // Perform remote disconnect which will trigger a client side connection error.
                    context.get().disconnect();
                // Perform remote disconnect which will trigger a client side connection error.
                context.get().disconnect();
                    // Wait for the error notification to reach the client.
                    listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
                } finally {
                    connection.close();
                }
                // Wait for the error notification to reach the client.
                listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
            } finally {
                connection.close();
            }
        } finally {
            factory.close();
            server.close();
        }
    }
    private LDAPListener createServer(final Semaphore latch, final AtomicReference<LDAPClientContext> context)
            throws IOException {
        return new LDAPListener(findFreeSocketAddress(), new ServerConnectionFactory<LDAPClientContext, Integer>() {
            @SuppressWarnings("unchecked")
    private LDAPListener createServer() {
        try {
            return new LDAPListener(findFreeSocketAddress(),
                    new ServerConnectionFactory<LDAPClientContext, Integer>() {
                        @Override
                        public ServerConnection<Integer> handleAccept(
                                final LDAPClientContext clientContext) throws ErrorResultException {
                            context.set(clientContext);
                            connectLatch.release();
                            return serverConnection;
                        }
                    });
        } catch (IOException e) {
            fail("Unable to create LDAP listener", e);
            return null;
        }
    }
    private Stubber notifyEvent(final Semaphore latch) {
        return doAnswer(new Answer<Void>() {
            @Override
            public ServerConnection<Integer> handleAccept(final LDAPClientContext clientContext)
                    throws ErrorResultException {
                context.set(clientContext);
            public Void answer(InvocationOnMock invocation) {
                latch.release();
                return mock(ServerConnection.class);
                return null;
            }
        });
    }
    private void registerAbandonEvent() {
        notifyEvent(abandonLatch).when(serverConnection).handleAbandon(any(Integer.class),
                any(AbandonRequest.class));
    }
    private void registerBindEvent() {
        notifyEvent(bindLatch).when(serverConnection).handleBind(any(Integer.class), anyInt(),
                any(BindRequest.class), any(IntermediateResponseHandler.class),
                any(ResultHandler.class));
    }
    private void registerCloseEvent() {
        notifyEvent(closeLatch).when(serverConnection).handleConnectionClosed(any(Integer.class),
                any(UnbindRequest.class));
    }
    private void registerSearchEvent() {
        notifyEvent(searchLatch).when(serverConnection).handleSearch(any(Integer.class),
                any(SearchRequest.class), any(IntermediateResponseHandler.class),
                any(SearchResultHandler.class));
    }
    private void resetState() {
        connectLatch.drainPermits();
        abandonLatch.drainPermits();
        bindLatch.drainPermits();
        searchLatch.drainPermits();
        closeLatch.drainPermits();
        context.set(null);
        serverConnection = mock(ServerConnection.class);
    }
    private void verifyNoAbandonSent() {
        verify(serverConnection, never()).handleAbandon(any(Integer.class),
                any(AbandonRequest.class));
    }
    private void verifyResultCodeIsClientSideTimeout(ErrorResultException error) {
        assertThat(error.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
    }
    @SuppressWarnings("unused")
    private void waitForAbandon() throws InterruptedException {
        waitForEvent(abandonLatch);
    }
    private void waitForBind() throws InterruptedException {
        waitForEvent(bindLatch);
    }
    private void waitForClose() throws InterruptedException {
        waitForEvent(closeLatch);
    }
    private void waitForConnect() throws InterruptedException {
        waitForEvent(connectLatch);
    }
    private void waitForEvent(final Semaphore latch) throws InterruptedException {
        assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
    }
    private void waitForSearch() throws InterruptedException {
        waitForEvent(searchLatch);
    }
}