opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/ConnectionState.java
New file @@ -0,0 +1,387 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS. */ package com.forgerock.opendj.ldap; import java.util.LinkedList; import java.util.List; import org.forgerock.opendj.ldap.ConnectionEventListener; import org.forgerock.opendj.ldap.ErrorResultException; import org.forgerock.opendj.ldap.responses.ExtendedResult; /** * This class can be used to manage the internal state of a connection, ensuring * valid and atomic state transitions, as well as connection event listener * notification. There are 4 states: * <ul> * <li>connection is <b>valid</b> (isClosed()=false, isFailed()=false): can fail * or be closed * <li>connection has failed due to an <b>error</b> (isClosed()=false, * isFailed()=true): can be closed * <li>connection has been <b>closed</b> by the application (isClosed()=true, * isFailed()=false): terminal state * <li>connection has failed due to an <b>error</b> and has been <b>closed</b> * by the application (isClosed()=true, isFailed()=true): terminal state * </ul> * All methods are synchronized and container classes may also synchronize on * the state where needed. The state transition methods, * {@link #notifyConnectionClosed()} and * {@link #notifyConnectionError(boolean, ErrorResultException)}, correspond to * methods in the {@link ConnectionEventListener} interface except that they * return a boolean indicating whether the transition was successful or not. */ public final class ConnectionState { /* * FIXME: The synchronization in this class has been kept simple for now. * However, ideally we should notify listeners without synchronizing on the * state in case a listener takes a long time to complete. */ /* * FIXME: This class should be used by connection pool and ldap connection * implementations as well. */ /** * Use the State design pattern to manage state transitions. */ private enum State { /** * Connection has not encountered an error nor has it been closed * (initial state). */ VALID() { @Override void addConnectionEventListener(final ConnectionState cs, final ConnectionEventListener listener) { cs.listeners.add(listener); } @Override boolean isClosed() { return false; } @Override boolean isFailed() { return false; } @Override boolean isValid() { return true; } @Override boolean notifyConnectionClosed(final ConnectionState cs) { for (final ConnectionEventListener listener : cs.listeners) { listener.handleConnectionClosed(); } cs.state = CLOSED; return true; } @Override boolean notifyConnectionError(final ConnectionState cs, final boolean isDisconnectNotification, final ErrorResultException error) { // Transition from valid to error state. cs.failedDueToDisconnect = isDisconnectNotification; cs.connectionError = error; for (final ConnectionEventListener listener : cs.listeners) { // Use the reason provided in the disconnect notification. listener.handleConnectionError(isDisconnectNotification, error); } cs.state = ERROR; return true; } @Override void notifyUnsolicitedNotification(final ConnectionState cs, final ExtendedResult notification) { for (final ConnectionEventListener listener : cs.listeners) { listener.handleUnsolicitedNotification(notification); } } }, /** * Connection has encountered an error, but has not been closed. */ ERROR() { @Override void addConnectionEventListener(final ConnectionState cs, final ConnectionEventListener listener) { listener.handleConnectionError(cs.failedDueToDisconnect, cs.connectionError); cs.listeners.add(listener); } @Override boolean isClosed() { return false; } @Override boolean isFailed() { return true; } @Override boolean isValid() { return false; } @Override boolean notifyConnectionClosed(final ConnectionState cs) { for (final ConnectionEventListener listener : cs.listeners) { listener.handleConnectionClosed(); } cs.state = ERROR_CLOSED; return true; } }, /** * Connection has been closed (terminal state). */ CLOSED() { @Override void addConnectionEventListener(final ConnectionState cs, final ConnectionEventListener listener) { listener.handleConnectionClosed(); } @Override boolean isClosed() { return true; } @Override boolean isFailed() { return false; } @Override boolean isValid() { return false; } }, /** * Connection has encountered an error and has been closed (terminal * state). */ ERROR_CLOSED() { @Override void addConnectionEventListener(final ConnectionState cs, final ConnectionEventListener listener) { listener.handleConnectionError(cs.failedDueToDisconnect, cs.connectionError); listener.handleConnectionClosed(); } @Override boolean isClosed() { return true; } @Override boolean isFailed() { return true; } @Override boolean isValid() { return false; } }; abstract void addConnectionEventListener(ConnectionState cs, final ConnectionEventListener listener); abstract boolean isClosed(); abstract boolean isFailed(); abstract boolean isValid(); boolean notifyConnectionClosed(final ConnectionState cs) { return false; } boolean notifyConnectionError(final ConnectionState cs, final boolean isDisconnectNotification, final ErrorResultException error) { return false; } void notifyUnsolicitedNotification(final ConnectionState cs, final ExtendedResult notification) { // Do nothing by default. } } /** * Non-{@code null} once the connection has failed due to a connection * error. Volatile so that it can be read without synchronization. */ private volatile ErrorResultException connectionError = null; /** * {@code true} if the connection has failed due to a disconnect * notification. */ private boolean failedDueToDisconnect = false; /** * Registered event listeners. */ private final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>(); /** * Internal state implementation. */ private volatile State state = State.VALID; /** * Creates a new connection state which is initially valid. */ public ConnectionState() { // Nothing to do. } /** * Registers the provided connection event listener so that it will be * notified when this connection is closed by the application, receives an * unsolicited notification, or experiences a fatal error. * * @param listener * The listener which wants to be notified when events occur on * this connection. * @throws IllegalStateException * If this connection has already been closed, i.e. if * {@code isClosed() == true}. * @throws NullPointerException * If the {@code listener} was {@code null}. */ public synchronized void addConnectionEventListener(final ConnectionEventListener listener) { state.addConnectionEventListener(this, listener); } /** * Returns the error that caused the connection to fail, or {@code null} if * the connection has not failed. * * @return The error that caused the connection to fail, or {@code null} if * the connection has not failed. */ public ErrorResultException getConnectionError() { return connectionError; } /** * Indicates whether or not this connection has been explicitly closed by * calling {@code close}. This method will not return {@code true} if a * fatal error has occurred on the connection unless {@code close} has been * called. * * @return {@code true} if this connection has been explicitly closed by * calling {@code close}, or {@code false} otherwise. */ public boolean isClosed() { return state.isClosed(); } /** * Returns {@code true} if the associated connection has not been closed and * no fatal errors have been detected. * * @return {@code true} if this connection is valid, {@code false} * otherwise. */ public boolean isValid() { return state.isValid(); } /** * Attempts to transition this connection state to closed and invokes event * listeners if successful. * * @return {@code true} if the state changed to closed, or {@code false} if * the state was already closed. * @see ConnectionEventListener#handleConnectionClosed() */ public synchronized boolean notifyConnectionClosed() { return state.notifyConnectionClosed(this); } /** * Attempts to transition this connection state to error and invokes event * listeners if successful. * * @param isDisconnectNotification * {@code true} if the error was triggered by a disconnect * notification sent by the server, otherwise {@code false}. * @param error * The exception that is about to be thrown to the application. * @return {@code true} if the state changed to error, or {@code false} if * the state was already error or closed. * @see ConnectionEventListener#handleConnectionError(boolean, * ErrorResultException) */ public synchronized boolean notifyConnectionError(final boolean isDisconnectNotification, final ErrorResultException error) { return state.notifyConnectionError(this, isDisconnectNotification, error); } /** * Notifies event listeners of the provided unsolicited notification if the * state is valid. * * @param notification * The unsolicited notification. * @see ConnectionEventListener#handleUnsolicitedNotification(ExtendedResult) */ public synchronized void notifyUnsolicitedNotification(final ExtendedResult notification) { state.notifyUnsolicitedNotification(this, notification); } /** * Removes the provided connection event listener from this connection so * that it will no longer be notified when this connection is closed by the * application, receives an unsolicited notification, or experiences a fatal * error. * * @param listener * The listener which no longer wants to be notified when events * occur on this connection. * @throws NullPointerException * If the {@code listener} was {@code null}. */ public synchronized void removeConnectionEventListener(final ConnectionEventListener listener) { listeners.remove(listener); } } opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java
@@ -22,6 +22,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions copyright 2013 ForgeRock AS. */ package com.forgerock.opendj.util; @@ -44,15 +45,12 @@ * The type of the outer result. */ public abstract class FutureResultTransformer<M, N> implements FutureResult<N>, ResultHandler<M> { private final ResultHandler<? super N> handler; private volatile FutureResult<? extends M> future = null; // These do not need to be volatile since the future acts as a memory // barrier. private N transformedResult = null; private ErrorResultException transformedErrorResult = null; /** @@ -77,8 +75,11 @@ * {@inheritDoc} */ public final N get() throws ErrorResultException, InterruptedException { future.get(); try { future.get(); } catch (ErrorResultException ignored) { // Ignore untransformed error. } // The handlers are guaranteed to have been invoked at this point. return get0(); } @@ -88,8 +89,11 @@ */ public final N get(final long timeout, final TimeUnit unit) throws ErrorResultException, TimeoutException, InterruptedException { future.get(timeout, unit); try { future.get(timeout, unit); } catch (ErrorResultException ignored) { // Ignore untransformed error. } // The handlers are guaranteed to have been invoked at this point. return get0(); } opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/TimeSource.java
New file @@ -0,0 +1,52 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS. */ package com.forgerock.opendj.util; /** * An interface which is used by various components in order to obtain the * current time. */ public interface TimeSource { /** * Default implementation which delegates to * {@link System#currentTimeMillis()}. */ public static final TimeSource DEFAULT = new TimeSource() { @Override public long currentTimeMillis() { return System.currentTimeMillis(); } }; /** * Returns the current time in milli-seconds. * * @return The current time in milli-seconds. */ long currentTimeMillis(); } opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -35,7 +35,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -67,6 +66,7 @@ import com.forgerock.opendj.util.AsynchronousFutureResult; import com.forgerock.opendj.util.CompletedFutureResult; import com.forgerock.opendj.util.ReferenceCountedObject; import com.forgerock.opendj.util.TimeSource; import com.forgerock.opendj.util.Validator; /** @@ -552,7 +552,7 @@ * since we don't want to hold the lock too long. */ idleConnections = new LinkedList<Connection>(); final long timeoutMillis = currentTimeMillis() - idleTimeoutMillis; final long timeoutMillis = timeSource.currentTimeMillis() - idleTimeoutMillis; int nonCoreConnectionCount = currentPoolSize() - corePoolSize; for (QueueElement holder = queue.peek(); nonCoreConnectionCount > 0 && isTimedOutQueuedConnection(holder, timeoutMillis); holder = queue.peek()) { @@ -631,10 +631,10 @@ } /** * This is intended for unit testing only in order to inject fake time * stamps. Use System.currentTimeMillis() when null. * This is package private in order to allow unit tests to inject fake time * stamps. */ Callable<Long> testTimeSource = null; TimeSource timeSource = TimeSource.DEFAULT; private final Semaphore availableConnections; private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler(); @@ -740,7 +740,7 @@ } else if (hasWaitingConnections()) { holder = queue.removeFirst(); } else { holder = new QueueElement(handler, currentTimeMillis()); holder = new QueueElement(handler, timeSource.currentTimeMillis()); queue.add(holder); } } @@ -802,23 +802,6 @@ return maxPoolSize - availableConnections.availablePermits(); } /* * This method delegates to System.currentTimeMillis() except in unit tests * where we use injected times. */ private long currentTimeMillis() { if (testTimeSource == null) { return System.currentTimeMillis(); } else { try { return testTimeSource.call(); } catch (final Exception e) { // Should not happen. throw new RuntimeException(e); } } } private boolean hasWaitingConnections() { return !queue.isEmpty() && !queue.getFirst().isWaitingFuture(); } @@ -839,7 +822,7 @@ connectionPoolIsClosing = true; holder = null; } else { holder = new QueueElement(connection, currentTimeMillis()); holder = new QueueElement(connection, timeSource.currentTimeMillis()); queue.add(holder); return; } opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -245,7 +245,8 @@ * Creates a new heart-beat connection factory which will create connections * using the provided connection factory and periodically ping any created * connections in order to detect that they are still alive every 10 seconds * using the default scheduler. * using the default scheduler. Connections will be marked as having failed * if a heart-beat takes longer than 500ms. * * @param factory * The connection factory to use for creating connections. @@ -254,7 +255,8 @@ * If {@code factory} was {@code null}. */ public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory) { return new HeartBeatConnectionFactory(factory); return new HeartBeatConnectionFactory(factory, 10000, 500, TimeUnit.MILLISECONDS, null, null); } /** @@ -267,6 +269,9 @@ * The connection factory to use for creating connections. * @param interval * The interval between keepalive pings. * @param timeout * The heart-beat timeout after which a connection will be marked * as failed. * @param unit * The time unit for the interval between keepalive pings. * @return The new heart-beat connection factory. @@ -276,8 +281,8 @@ * If {@code factory} or {@code unit} was {@code null}. */ public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit) { return new HeartBeatConnectionFactory(factory, interval, unit); final long interval, final long timeout, final TimeUnit unit) { return new HeartBeatConnectionFactory(factory, interval, timeout, unit, null, null); } /** @@ -290,6 +295,9 @@ * The connection factory to use for creating connections. * @param interval * The interval between keepalive pings. * @param timeout * The heart-beat timeout after which a connection will be marked * as failed. * @param unit * The time unit for the interval between keepalive pings. * @param heartBeat @@ -302,8 +310,9 @@ * {@code null}. */ public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit, final SearchRequest heartBeat) { return new HeartBeatConnectionFactory(factory, interval, unit, heartBeat); final long interval, final long timeout, final TimeUnit unit, final SearchRequest heartBeat) { return new HeartBeatConnectionFactory(factory, interval, timeout, unit, heartBeat, null); } /** @@ -316,6 +325,9 @@ * The connection factory to use for creating connections. * @param interval * The interval between keepalive pings. * @param timeout * The heart-beat timeout after which a connection will be marked * as failed. * @param unit * The time unit for the interval between keepalive pings. * @param heartBeat @@ -331,9 +343,10 @@ * {@code null}. */ public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit, final SearchRequest heartBeat, final ScheduledExecutorService scheduler) { return new HeartBeatConnectionFactory(factory, interval, unit, heartBeat, scheduler); final long interval, final long timeout, final TimeUnit unit, final SearchRequest heartBeat, final ScheduledExecutorService scheduler) { return new HeartBeatConnectionFactory(factory, interval, timeout, unit, heartBeat, scheduler); } /** opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java
@@ -72,7 +72,7 @@ * If {@code resultCode} was {@code null}. */ public static ErrorResultException newErrorResult(ResultCode resultCode, String diagnosticMessage) { CharSequence diagnosticMessage) { return newErrorResult(resultCode, diagnosticMessage, null); } @@ -114,10 +114,10 @@ * If {@code resultCode} was {@code null}. */ public static ErrorResultException newErrorResult(ResultCode resultCode, String diagnosticMessage, Throwable cause) { CharSequence diagnosticMessage, Throwable cause) { final Result result = Responses.newResult(resultCode); if (diagnosticMessage != null) { result.setDiagnosticMessage(diagnosticMessage); result.setDiagnosticMessage(diagnosticMessage.toString()); } else if (cause != null) { result.setDiagnosticMessage(cause.getLocalizedMessage()); } opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -27,23 +27,27 @@ package org.forgerock.opendj.ldap; import static com.forgerock.opendj.util.StaticUtils.*; import static java.lang.System.*; import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG; import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER; import static org.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT; import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_FAILED; import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import static org.forgerock.opendj.ldap.ErrorResultException.*; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.logging.Level; import org.forgerock.opendj.ldap.requests.AbandonRequest; import org.forgerock.opendj.ldap.requests.AddRequest; import org.forgerock.opendj.ldap.requests.BindRequest; import org.forgerock.opendj.ldap.requests.CompareRequest; @@ -54,30 +58,297 @@ import org.forgerock.opendj.ldap.requests.Requests; import org.forgerock.opendj.ldap.requests.SearchRequest; import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest; import org.forgerock.opendj.ldap.requests.UnbindRequest; import org.forgerock.opendj.ldap.responses.BindResult; import org.forgerock.opendj.ldap.responses.CompareResult; import org.forgerock.opendj.ldap.responses.ExtendedResult; import org.forgerock.opendj.ldap.responses.GenericExtendedResult; import org.forgerock.opendj.ldap.responses.Result; import org.forgerock.opendj.ldap.responses.SearchResultEntry; import org.forgerock.opendj.ldap.responses.SearchResultReference; import org.forgerock.opendj.ldif.ConnectionEntryReader; import com.forgerock.opendj.ldap.ConnectionState; import com.forgerock.opendj.util.AsynchronousFutureResult; import com.forgerock.opendj.util.CompletedFutureResult; import com.forgerock.opendj.util.FutureResultTransformer; import com.forgerock.opendj.util.RecursiveFutureResult; import com.forgerock.opendj.util.ReferenceCountedObject; import com.forgerock.opendj.util.TimeSource; import com.forgerock.opendj.util.Validator; /** * An heart beat connection factory can be used to create connections that sends * a periodic search request to a Directory Server. * <p> * Before returning new connections to the application this factory will first * send an initial heart beat in order to determine that the remote server is * responsive. If the heart beat fails or times out then the connection is * closed immediately and an error returned to the client. * <p> * Once a connection has been established successfully (including the initial * heart beat), this factory will periodically send heart beats on the * connection based on the configured heart beat interval. If the heart beat * times out then the server is assumed to be down and an appropriate * {@link ConnectionException} generated and published to any registered * {@link ConnectionEventListener}s. Note however, that heart beats will only be * sent when the connection is determined to be reasonably idle: there is no * point in sending heart beats if the connection has recently received a * response. A connection is deemed to be idle if no response has been received * during a period equivalent to half the heart beat interval. * <p> * The LDAP protocol specifically precludes clients from performing operations * while bind or startTLS requests are being performed. Likewise, a bind or * startTLS request will cause active operations to be aborted. This factory * coordinates heart beats with bind or startTLS requests, ensuring that they * are not performed concurrently. Specifically, bind and startTLS requests are * queued up while a heart beat is pending, and heart beats are not sent at all * while there are pending bind or startTLS requests. */ final class HeartBeatConnectionFactory implements ConnectionFactory { /** * This class is responsible for performing an initial heart beat once a new * connection has been obtained and, if the heart beat succeeds, adapting it * to a {@code ConnectionImpl} and registering it in the table of valid * connections. */ private final class ConnectionFutureResultImpl { /** * This class handles the initial heart beat result notification or * timeout. We need to take care to avoid processing multiple results, * which may occur when the heart beat is timed out and a result follows * soon after, or vice versa. */ private final class InitialHeartBeatResultHandler implements SearchResultHandler, Runnable { private final ResultHandler<? super Result> handler; /** * Due to a potential race between the heart beat timing out and the * heart beat completing this atomic ensures that notification only * occurs once. */ private final AtomicBoolean isComplete = new AtomicBoolean(); private InitialHeartBeatResultHandler(final ResultHandler<? super Result> handler) { this.handler = handler; } @Override public boolean handleEntry(final SearchResultEntry entry) { /* * Depending on the configuration, a heartbeat may return some * entries. However, we can just ignore them. */ return true; } @Override public void handleErrorResult(final ErrorResultException error) { if (isComplete.compareAndSet(false, true)) { handler.handleErrorResult(error); } } @Override public boolean handleReference(final SearchResultReference reference) { /* * Depending on the configuration, a heartbeat may return some * references. However, we can just ignore them. */ return true; } @Override public void handleResult(final Result result) { if (isComplete.compareAndSet(false, true)) { handler.handleResult(result); } } /* * Invoked by the scheduler when the heart beat times out. */ @Override public void run() { handleErrorResult(newHeartBeatTimeoutError()); } } private Connection connection; private final RecursiveFutureResult<Connection, Result> futureConnectionResult; private final FutureResultTransformer<Result, Connection> futureSearchResult; private ConnectionFutureResultImpl(final ResultHandler<? super Connection> handler) { // Create a future which will handle the initial heart beat result. this.futureSearchResult = new FutureResultTransformer<Result, Connection>(handler) { @Override protected ErrorResultException transformErrorResult( final ErrorResultException errorResult) { // Ensure that the connection is closed. if (connection != null) { connection.close(); connection = null; } return adaptHeartBeatError(errorResult); } @Override protected Connection transformResult(final Result result) throws ErrorResultException { return adaptConnection(connection); } }; // Create a future which will handle connection result. this.futureConnectionResult = new RecursiveFutureResult<Connection, Result>(futureSearchResult) { @Override protected FutureResult<? extends Result> chainResult( final Connection innerResult, final ResultHandler<? super Result> handler) throws ErrorResultException { // Save the connection for later once the heart beat completes. connection = innerResult; /* * Send the initial heart beat and schedule a client * side timeout notification. */ final InitialHeartBeatResultHandler wrappedHandler = new InitialHeartBeatResultHandler(handler); scheduler.get().schedule(wrappedHandler, timeoutMS, TimeUnit.MILLISECONDS); return connection.searchAsync(heartBeatRequest, null, wrappedHandler); } }; // Link the two futures. futureSearchResult.setFutureResult(futureConnectionResult); } } /** * A connection that sends heart beats and supports all operations. */ private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements ConnectionEventListener, SearchResultHandler { private final class ConnectionImpl extends AbstractAsynchronousConnection implements ConnectionEventListener { /** * A result handler wrapper for operations which timestamps the * connection for each response received. The wrapper ensures that * completed requests are removed from the {@code pendingResults} queue, * as well as ensuring that requests are only completed once. */ private abstract class AbstractWrappedResultHandler<R, H extends ResultHandler<? super R>> implements ResultHandler<R>, FutureResult<R> { /** The user provided result handler. */ protected final H handler; private final CountDownLatch completed = new CountDownLatch(1); private ErrorResultException error; private FutureResult<R> innerFuture; private R result; AbstractWrappedResultHandler(final H handler) { this.handler = handler; } @Override public boolean cancel(final boolean mayInterruptIfRunning) { return innerFuture.cancel(mayInterruptIfRunning); } @Override public R get() throws ErrorResultException, InterruptedException { completed.await(); return get0(); } @Override public R get(final long timeout, final TimeUnit unit) throws ErrorResultException, TimeoutException, InterruptedException { if (completed.await(timeout, unit)) { return get0(); } else { throw new TimeoutException(); } } @Override public int getRequestID() { return innerFuture.getRequestID(); } @Override public void handleErrorResult(final ErrorResultException error) { if (tryComplete(null, error)) { if (handler != null) { handler.handleErrorResult(timestamp(error)); } else { timestamp(error); } } } @Override public void handleResult(final R result) { if (tryComplete(result, null)) { if (handler != null) { handler.handleResult(timestamp(result)); } else { timestamp(result); } } } @Override public boolean isCancelled() { return innerFuture.isCancelled(); } @Override public boolean isDone() { return completed.getCount() == 0; } abstract void releaseBindOrStartTLSLockIfNeeded(); FutureResult<R> setInnerFuture(final FutureResult<R> innerFuture) { this.innerFuture = innerFuture; return this; } private R get0() throws ErrorResultException { if (result != null) { return result; } else { throw error; } } /** * Attempts to complete this request, returning true if successful. * This method is synchronized in order to avoid race conditions * with search result processing. */ private synchronized boolean tryComplete(final R result, final ErrorResultException error) { if (pendingResults.remove(this)) { this.result = result; this.error = error; completed.countDown(); releaseBindOrStartTLSLockIfNeeded(); return true; } else { return false; } } } /** * Runs pending request once the shared lock becomes available (when no @@ -120,161 +391,246 @@ } return null; } } /* * List of pending Bind or StartTLS requests which must be invoked when /** * A result handler wrapper for bind or startTLS requests which releases * the bind/startTLS lock on completion. */ private final class WrappedBindOrStartTLSResultHandler<R> extends AbstractWrappedResultHandler<R, ResultHandler<? super R>> { WrappedBindOrStartTLSResultHandler(final ResultHandler<? super R> handler) { super(handler); } @Override void releaseBindOrStartTLSLockIfNeeded() { releaseBindOrStartTLSLock(); } } /** * A result handler wrapper for normal requests which does not release * the bind/startTLS lock on completion. */ private final class WrappedResultHandler<R> extends AbstractWrappedResultHandler<R, ResultHandler<? super R>> { WrappedResultHandler(final ResultHandler<? super R> handler) { super(handler); } @Override void releaseBindOrStartTLSLockIfNeeded() { // No-op for normal operations. } } /** * A result handler wrapper for search operations. Ensures that search * results are not sent once the request has been completed (see * markComplete()). */ private final class WrappedSearchResultHandler extends AbstractWrappedResultHandler<Result, SearchResultHandler> implements SearchResultHandler { WrappedSearchResultHandler(final SearchResultHandler handler) { super(handler); } @Override public synchronized boolean handleEntry(final SearchResultEntry entry) { if (!isDone()) { if (handler != null) { handler.handleEntry(timestamp(entry)); } else { timestamp(entry); } return true; } else { return false; } } @Override public synchronized boolean handleReference(final SearchResultReference reference) { if (!isDone()) { if (handler != null) { handler.handleReference(timestamp(reference)); } else { timestamp(reference); } return true; } else { return false; } } @Override void releaseBindOrStartTLSLockIfNeeded() { // No-op for normal operations. } } /** The wrapped connection. */ private final Connection connection; /** * Search result handler for processing heart beat responses. */ private final SearchResultHandler heartBeatHandler = new SearchResultHandler() { @Override public boolean handleEntry(final SearchResultEntry entry) { timestamp(entry); return true; } @Override public void handleErrorResult(final ErrorResultException error) { /* * Connection failure will be handled by connection event * listener. Ignore cancellation errors since these indicate * that the heart beat was aborted by a client-side close. */ if (!(error instanceof CancelledResultException)) { if (DEBUG_LOG.isLoggable(Level.WARNING)) { DEBUG_LOG.warning(String.format( "Heartbeat failed for connection factory '%s': %s", factory, error .getMessage())); } timestamp(error); } releaseHeartBeatLock(); } @Override public boolean handleReference(final SearchResultReference reference) { timestamp(reference); return true; } @Override public void handleResult(final Result result) { timestamp(result); releaseHeartBeatLock(); } }; /** * List of pending Bind or StartTLS requests which must be invoked once * the current heart beat completes. */ private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>(); private final Queue<Runnable> pendingBindOrStartTLSRequests = new ConcurrentLinkedQueue<Runnable>(); /* Coordinates heart-beats with Bind and StartTLS requests. */ /** * List of pending responses for all active operations. These will be * signalled if no heart beat is detected within the permitted timeout * period. */ private final Queue<AbstractWrappedResultHandler<?, ?>> pendingResults = new ConcurrentLinkedQueue<AbstractWrappedResultHandler<?, ?>>(); /** Internal connection state. */ private final ConnectionState state = new ConnectionState(); /** Coordinates heart-beats with Bind and StartTLS requests. */ private final Sync sync = new Sync(); /* /** * Timestamp of last response received (any response, not just heart * beats). */ private volatile long timestamp = currentTimeMillis(); // Assume valid at creation. private volatile long lastResponseTimestamp = timeSource.currentTimeMillis(); // Assume valid at creation. private ConnectionImpl(final Connection connection) { super(connection); this.connection = connection; connection.addConnectionEventListener(this); } @Override public Result add(final AddRequest request) throws ErrorResultException { try { return timestamp(connection.add(request)); } catch (final ErrorResultException e) { throw timestamp(e); } } @Override public Result add(final Entry entry) throws ErrorResultException { try { return timestamp(connection.add(entry)); } catch (final ErrorResultException e) { throw timestamp(e); } } @Override public Result add(final String... ldifLines) throws ErrorResultException { try { return timestamp(connection.add(ldifLines)); } catch (final ErrorResultException e) { throw timestamp(e); } public FutureResult<Void> abandonAsync(final AbandonRequest request) { return connection.abandonAsync(request); } @Override public FutureResult<Result> addAsync(final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super Result> resultHandler) { return connection.addAsync(request, intermediateResponseHandler, timestamper(resultHandler)); } @Override public BindResult bind(final BindRequest request) throws ErrorResultException { acquireBindOrStartTLSLock(); try { return timestamp(connection.bind(request)); } catch (final ErrorResultException e) { throw timestamp(e); } finally { releaseBindOrStartTLSLock(); if (checkState(resultHandler)) { final WrappedResultHandler<Result> h = wrap(resultHandler); return checkState(connection.addAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @Override public BindResult bind(final String name, final char[] password) throws ErrorResultException { acquireBindOrStartTLSLock(); try { return timestamp(connection.bind(name, password)); } catch (final ErrorResultException e) { throw timestamp(e); } finally { releaseBindOrStartTLSLock(); } public void addConnectionEventListener(final ConnectionEventListener listener) { state.addConnectionEventListener(listener); } @Override public FutureResult<BindResult> bindAsync(final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super BindResult> resultHandler) { if (sync.tryLockShared()) { // Fast path return connection.bindAsync(request, intermediateResponseHandler, timestamper( resultHandler, true)); if (checkState(resultHandler)) { if (sync.tryLockShared()) { // Fast path final WrappedBindOrStartTLSResultHandler<BindResult> h = wrapForBindOrStartTLS(resultHandler); return checkState( connection.bindAsync(request, intermediateResponseHandler, h), h); } else { /* * A heart beat must be in progress so create a runnable * task which will be executed when the heart beat * completes. */ final DelayedFuture<BindResult> future = new DelayedFuture<BindResult>(resultHandler) { @Override public FutureResult<BindResult> dispatch() { final WrappedBindOrStartTLSResultHandler<BindResult> h = wrapForBindOrStartTLS(this); return checkState(connection.bindAsync(request, intermediateResponseHandler, h), h); } }; /* * Enqueue and flush if the heart beat has completed in the * mean time. */ pendingBindOrStartTLSRequests.offer(future); flushPendingBindOrStartTLSRequests(); return future; } } else { /* * A heart beat must be in progress so create a runnable task * which will be executed when the heart beat completes. */ final DelayedFuture<BindResult> future = new DelayedFuture<BindResult>(resultHandler) { @Override public FutureResult<BindResult> dispatch() { return connection.bindAsync(request, intermediateResponseHandler, timestamper(this, true)); } }; /* * Enqueue and flush if the heart beat has completed in the mean * time. */ pendingRequests.offer(future); flushPendingRequests(); return future; return newConnectionErrorFuture(); } } @Override public CompareResult compare(final CompareRequest request) throws ErrorResultException { try { return timestamp(connection.compare(request)); } catch (final ErrorResultException e) { throw timestamp(e); } public void close() { handleConnectionClosed(); connection.close(); } @Override public CompareResult compare(final String name, final String attributeDescription, final String assertionValue) throws ErrorResultException { try { return timestamp(connection.compare(name, attributeDescription, assertionValue)); } catch (final ErrorResultException e) { throw timestamp(e); } public void close(final UnbindRequest request, final String reason) { handleConnectionClosed(); connection.close(request, reason); } @Override public FutureResult<CompareResult> compareAsync(final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super CompareResult> resultHandler) { return connection.compareAsync(request, intermediateResponseHandler, timestamper(resultHandler)); } @Override public Result delete(final DeleteRequest request) throws ErrorResultException { try { return timestamp(connection.delete(request)); } catch (final ErrorResultException e) { throw timestamp(e); } } @Override public Result delete(final String name) throws ErrorResultException { try { return timestamp(connection.delete(name)); } catch (final ErrorResultException e) { throw timestamp(e); if (checkState(resultHandler)) { final WrappedResultHandler<CompareResult> h = wrap(resultHandler); return checkState(connection.compareAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @@ -282,61 +638,12 @@ public FutureResult<Result> deleteAsync(final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super Result> resultHandler) { return connection.deleteAsync(request, intermediateResponseHandler, timestamper(resultHandler)); } @Override public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request) throws ErrorResultException { final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID); if (isStartTLS) { acquireBindOrStartTLSLock(); } try { return timestamp(connection.extendedRequest(request)); } catch (final ErrorResultException e) { throw timestamp(e); } finally { if (isStartTLS) { releaseBindOrStartTLSLock(); } } } @Override public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request, final IntermediateResponseHandler handler) throws ErrorResultException { final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID); if (isStartTLS) { acquireBindOrStartTLSLock(); } try { return timestamp(connection.extendedRequest(request, handler)); } catch (final ErrorResultException e) { throw timestamp(e); } finally { if (isStartTLS) { releaseBindOrStartTLSLock(); } } } @Override public GenericExtendedResult extendedRequest(final String requestName, final ByteString requestValue) throws ErrorResultException { final boolean isStartTLS = requestName.equals(StartTLSExtendedRequest.OID); if (isStartTLS) { acquireBindOrStartTLSLock(); } try { return timestamp(connection.extendedRequest(requestName, requestValue)); } catch (final ErrorResultException e) { throw timestamp(e); } finally { if (isStartTLS) { releaseBindOrStartTLSLock(); } if (checkState(resultHandler)) { final WrappedResultHandler<Result> h = wrap(resultHandler); return checkState(connection.deleteAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @@ -345,129 +652,101 @@ final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super R> resultHandler) { final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID); if (isStartTLS) { if (sync.tryLockShared()) { // Fast path return connection.extendedRequestAsync(request, intermediateResponseHandler, timestamper(resultHandler, true)); } else { /* * A heart beat must be in progress so create a runnable * task which will be executed when the heart beat * completes. */ final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) { @Override public FutureResult<R> dispatch() { return connection.extendedRequestAsync(request, intermediateResponseHandler, timestamper(this, true)); } }; if (checkState(resultHandler)) { if (isStartTLSRequest(request)) { if (sync.tryLockShared()) { // Fast path final WrappedBindOrStartTLSResultHandler<R> h = wrapForBindOrStartTLS(resultHandler); return checkState(connection.extendedRequestAsync(request, intermediateResponseHandler, h), h); } else { /* * A heart beat must be in progress so create a runnable * task which will be executed when the heart beat * completes. */ final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) { @Override public FutureResult<R> dispatch() { final WrappedBindOrStartTLSResultHandler<R> h = wrapForBindOrStartTLS(this); return checkState(connection.extendedRequestAsync(request, intermediateResponseHandler, h), h); } }; /* * Enqueue and flush if the heart beat has completed in the * mean time. */ pendingRequests.offer(future); flushPendingRequests(); return future; /* * Enqueue and flush if the heart beat has completed in * the mean time. */ pendingBindOrStartTLSRequests.offer(future); flushPendingBindOrStartTLSRequests(); return future; } } else { final WrappedResultHandler<R> h = wrap(resultHandler); return checkState(connection.extendedRequestAsync(request, intermediateResponseHandler, h), h); } } else { return connection.extendedRequestAsync(request, intermediateResponseHandler, timestamper(resultHandler)); return newConnectionErrorFuture(); } } @Override public void handleConnectionClosed() { notifyClosed(); if (state.notifyConnectionClosed()) { failPendingResults(newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, HBCF_CONNECTION_CLOSED_BY_CLIENT.get())); synchronized (validConnections) { connection.removeConnectionEventListener(this); validConnections.remove(this); if (validConnections.isEmpty()) { /* * This is the last active connection, so stop the * heartbeat. */ heartBeatFuture.cancel(false); } } } } @Override public void handleConnectionError(final boolean isDisconnectNotification, final ErrorResultException error) { notifyClosed(); } @Override public boolean handleEntry(final SearchResultEntry entry) { updateTimestamp(); return true; } @Override public void handleErrorResult(final ErrorResultException error) { if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage())); if (state.notifyConnectionError(isDisconnectNotification, error)) { failPendingResults(error); } updateTimestamp(); releaseHeartBeatLock(); } @Override public boolean handleReference(final SearchResultReference reference) { updateTimestamp(); return true; } @Override public void handleResult(final Result result) { updateTimestamp(); releaseHeartBeatLock(); } @Override public void handleUnsolicitedNotification(final ExtendedResult notification) { updateTimestamp(); timestamp(notification); state.notifyUnsolicitedNotification(notification); } @Override public boolean isClosed() { return state.isClosed(); } @Override public boolean isValid() { return connection.isValid() && currentTimeMillis() < (timestamp + timeoutMS); } @Override public Result modify(final ModifyRequest request) throws ErrorResultException { try { return timestamp(connection.modify(request)); } catch (final ErrorResultException e) { throw timestamp(e); } } @Override public Result modify(final String... ldifLines) throws ErrorResultException { try { return timestamp(connection.modify(ldifLines)); } catch (final ErrorResultException e) { throw timestamp(e); } return state.isValid() && connection.isValid(); } @Override public FutureResult<Result> modifyAsync(final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super Result> resultHandler) { return connection.modifyAsync(request, intermediateResponseHandler, timestamper(resultHandler)); } @Override public Result modifyDN(final ModifyDNRequest request) throws ErrorResultException { try { return timestamp(connection.modifyDN(request)); } catch (final ErrorResultException e) { throw timestamp(e); } } @Override public Result modifyDN(final String name, final String newRDN) throws ErrorResultException { try { return timestamp(connection.modifyDN(name, newRDN)); } catch (final ErrorResultException e) { throw timestamp(e); if (checkState(resultHandler)) { final WrappedResultHandler<Result> h = wrap(resultHandler); return checkState(connection.modifyAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @@ -475,121 +754,34 @@ public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super Result> resultHandler) { return connection.modifyDNAsync(request, intermediateResponseHandler, timestamper(resultHandler)); } @Override public SearchResultEntry readEntry(final DN name, final String... attributeDescriptions) throws ErrorResultException { try { return timestamp(connection.readEntry(name, attributeDescriptions)); } catch (final ErrorResultException e) { throw timestamp(e); if (checkState(resultHandler)) { final WrappedResultHandler<Result> h = wrap(resultHandler); return checkState( connection.modifyDNAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @Override public SearchResultEntry readEntry(final String name, final String... attributeDescriptions) throws ErrorResultException { try { return timestamp(connection.readEntry(name, attributeDescriptions)); } catch (final ErrorResultException e) { throw timestamp(e); } } @Override public FutureResult<SearchResultEntry> readEntryAsync(final DN name, final Collection<String> attributeDescriptions, final ResultHandler<? super SearchResultEntry> handler) { return connection.readEntryAsync(name, attributeDescriptions, timestamper(handler)); } @Override public ConnectionEntryReader search(final SearchRequest request) { // Ensure that search results update timestamp. return new ConnectionEntryReader(this, request); } @Override public Result search(final SearchRequest request, final Collection<? super SearchResultEntry> entries) throws ErrorResultException { try { return timestamp(connection.search(request, entries)); } catch (final ErrorResultException e) { throw timestamp(e); } } @Override public Result search(final SearchRequest request, final Collection<? super SearchResultEntry> entries, final Collection<? super SearchResultReference> references) throws ErrorResultException { try { return timestamp(connection.search(request, entries, references)); } catch (final ErrorResultException e) { throw timestamp(e); } } @Override public Result search(final SearchRequest request, final SearchResultHandler handler) throws ErrorResultException { try { return connection.search(request, timestamper(handler)); } catch (final ErrorResultException e) { throw timestamp(e); } } @Override public ConnectionEntryReader search(final String baseObject, final SearchScope scope, final String filter, final String... attributeDescriptions) { // Ensure that search results update timestamp. final SearchRequest request = Requests.newSearchRequest(baseObject, scope, filter, attributeDescriptions); return new ConnectionEntryReader(this, request); public void removeConnectionEventListener(final ConnectionEventListener listener) { state.removeConnectionEventListener(listener); } @Override public FutureResult<Result> searchAsync(final SearchRequest request, final IntermediateResponseHandler intermediateResponseHandler, final SearchResultHandler resultHandler) { return connection.searchAsync(request, intermediateResponseHandler, timestamper(resultHandler)); } @Override public SearchResultEntry searchSingleEntry(final SearchRequest request) throws ErrorResultException { try { return timestamp(connection.searchSingleEntry(request)); } catch (final ErrorResultException e) { throw timestamp(e); if (checkState(resultHandler)) { final WrappedSearchResultHandler h = wrap(resultHandler); return checkState(connection.searchAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @Override public SearchResultEntry searchSingleEntry(final String baseObject, final SearchScope scope, final String filter, final String... attributeDescriptions) throws ErrorResultException { try { return timestamp(connection.searchSingleEntry(baseObject, scope, filter, attributeDescriptions)); } catch (final ErrorResultException e) { throw timestamp(e); } } @Override public FutureResult<SearchResultEntry> searchSingleEntryAsync(final SearchRequest request, final ResultHandler<? super SearchResultEntry> handler) { return connection.searchSingleEntryAsync(request, timestamper(handler)); } @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("HeartBeatConnection("); @@ -598,24 +790,57 @@ return builder.toString(); } private void acquireBindOrStartTLSLock() throws ErrorResultException { /* * Wait for pending heartbeats and prevent new heartbeats from being * sent while the bind is in progress. */ try { if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) { // Give up - it looks like the connection is dead. // FIXME: improve error message. throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN); private void checkForHeartBeat() { if (sync.isHeldExclusively()) { /* * A heart beat is still in progress, but it should have * completed by now. Let's avoid aggressively terminating the * connection, because the heart beat may simply have been * delayed by a sudden surge of activity. Therefore, only flag * the connection as failed if no activity has been seen on the * connection since the heart beat was sent. */ final long currentTimeMillis = timeSource.currentTimeMillis(); if (lastResponseTimestamp < (currentTimeMillis - timeoutMS)) { if (DEBUG_LOG.isLoggable(Level.WARNING)) { DEBUG_LOG.warning(String.format( "No heartbeat detected for connection '%s'", connection)); } handleConnectionError(false, newHeartBeatTimeoutError()); } } catch (final InterruptedException e) { throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e); } } private void flushPendingRequests() { if (!pendingRequests.isEmpty()) { private <R> FutureResult<R> checkState(final FutureResult<R> future, final AbstractWrappedResultHandler<R, ? extends ResultHandler<? super R>> h) { h.setInnerFuture(future); checkState(h); return h; } private boolean checkState(final ResultHandler<?> h) { final ErrorResultException error = state.getConnectionError(); if (error != null) { h.handleErrorResult(error); return false; } else { return true; } } private void failPendingResults(final ErrorResultException error) { /* * Peek instead of pool because notification is responsible for * removing the element from the queue. */ AbstractWrappedResultHandler<?, ?> pendingResult; while ((pendingResult = pendingResults.peek()) != null) { pendingResult.handleErrorResult(error); } } private void flushPendingBindOrStartTLSRequests() { if (!pendingBindOrStartTLSRequests.isEmpty()) { /* * The pending requests will acquire the shared lock, but we * take it here anyway to ensure that pending requests do not @@ -624,7 +849,8 @@ if (sync.tryLockShared()) { try { Runnable pendingRequest; while ((pendingRequest = pendingRequests.poll()) != null) { while ((pendingRequest = pendingBindOrStartTLSRequests.poll()) != null) { // Dispatch the waiting request. This will not block. pendingRequest.run(); } } finally { @@ -634,18 +860,12 @@ } } private void notifyClosed() { synchronized (activeConnections) { connection.removeConnectionEventListener(this); activeConnections.remove(this); if (activeConnections.isEmpty()) { /* * This is the last active connection, so stop the * heartbeat. */ heartBeatFuture.cancel(false); } } private boolean isStartTLSRequest(final ExtendedRequest<?> request) { return request.getOID().equals(StartTLSExtendedRequest.OID); } private <R> CompletedFutureResult<R> newConnectionErrorFuture() { return new CompletedFutureResult<R>(state.getConnectionError()); } private void releaseBindOrStartTLSLock() { @@ -654,27 +874,43 @@ private void releaseHeartBeatLock() { sync.unlockExclusively(); flushPendingRequests(); flushPendingBindOrStartTLSRequests(); } private void sendHeartBeat() { /** * Sends a heart beat on this connection if required to do so. * * @return {@code true} if a heart beat was sent, otherwise * {@code false}. */ private boolean sendHeartBeat() { /* * Only send the heartbeat if the connection has been idle for some * Don't attempt to send a heart beat if the connection has already * failed. */ if (!state.isValid()) { return false; } /* * Only send the heart beat if the connection has been idle for some * time. */ if (currentTimeMillis() < (timestamp + minDelayMS)) { return; final long currentTimeMillis = timeSource.currentTimeMillis(); if (currentTimeMillis < (lastResponseTimestamp + minDelayMS)) { return false; } /* * Don't send a heart beat if there is already a heart beat, bind, * or startTLS in progress. Note that the bind/startTLS response * will update the timestamp as if it were a heart beat. * will update the lastResponseTimestamp as if it were a heart beat. */ if (sync.tryLockExclusively()) { try { connection.searchAsync(heartBeatRequest, null, this); } catch (final Exception e) { connection.searchAsync(heartBeatRequest, null, heartBeatHandler); return true; } catch (final IllegalStateException e) { /* * This may happen when we attempt to send the heart beat * just after the connection is closed but before we are @@ -688,74 +924,34 @@ releaseHeartBeatLock(); } } return false; } private <R> R timestamp(final R response) { updateTimestamp(); if (!(response instanceof ConnectionException)) { lastResponseTimestamp = timeSource.currentTimeMillis(); } return response; } private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler) { return timestamper(handler, false); private <R> WrappedResultHandler<R> wrap(final ResultHandler<? super R> handler) { final WrappedResultHandler<R> h = new WrappedResultHandler<R>(handler); pendingResults.add(h); return h; } private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler, final boolean isBindOrStartTLS) { return new ResultHandler<R>() { @Override public void handleErrorResult(final ErrorResultException error) { releaseIfNeeded(); if (handler != null) { handler.handleErrorResult(timestamp(error)); } else { timestamp(error); } } @Override public void handleResult(final R result) { releaseIfNeeded(); if (handler != null) { handler.handleResult(timestamp(result)); } else { timestamp(result); } } private void releaseIfNeeded() { if (isBindOrStartTLS) { releaseBindOrStartTLSLock(); } } }; private WrappedSearchResultHandler wrap(final SearchResultHandler handler) { final WrappedSearchResultHandler h = new WrappedSearchResultHandler(handler); pendingResults.add(h); return h; } private SearchResultHandler timestamper(final SearchResultHandler handler) { return new SearchResultHandler() { @Override public boolean handleEntry(final SearchResultEntry entry) { return handler.handleEntry(timestamp(entry)); } @Override public void handleErrorResult(final ErrorResultException error) { handler.handleErrorResult(timestamp(error)); } @Override public boolean handleReference(final SearchResultReference reference) { return handler.handleReference(timestamp(reference)); } @Override public void handleResult(final Result result) { handler.handleResult(timestamp(result)); } }; } private void updateTimestamp() { timestamp = currentTimeMillis(); private <R> WrappedBindOrStartTLSResultHandler<R> wrapForBindOrStartTLS( final ResultHandler<? super R> handler) { final WrappedBindOrStartTLSResultHandler<R> h = new WrappedBindOrStartTLSResultHandler<R>(handler); pendingResults.add(h); return h; } } @@ -791,13 +987,13 @@ * </ul> */ private static final class Sync extends AbstractQueuedSynchronizer { /* Lock states. Positive values indicate that the shared lock is taken. */ private static final int UNLOCKED = 0; // initial state private static final int LOCKED_EXCLUSIVELY = -1; // Keep compiler quiet. private static final long serialVersionUID = -3590428415442668336L; /* Lock states. Positive values indicate that the shared lock is taken. */ private static final int UNLOCKED = 0; // initial state @Override protected boolean isHeldExclusively() { return getState() == LOCKED_EXCLUSIVELY; @@ -866,10 +1062,6 @@ return tryAcquireShared(1) > 0; } boolean tryLockShared(final long timeout, final TimeUnit unit) throws InterruptedException { return tryAcquireSharedNanos(1, unit.toNanos(timeout)); } void unlockExclusively() { release(0 /* unused */); } @@ -880,60 +1072,122 @@ } /** * Default heart beat which will target the root DSE but not return any * results. */ private static final SearchRequest DEFAULT_SEARCH = Requests.newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1"); SearchScope.BASE_OBJECT, "(objectClass=1.1)", "1.1"); private final List<ConnectionImpl> activeConnections; /** * This is package private in order to allow unit tests to inject fake time * stamps. */ TimeSource timeSource = TimeSource.DEFAULT; /** * Scheduled task which checks that all heart beats have been received * within the timeout period. */ private final Runnable checkHeartBeatRunnable = new Runnable() { @Override public void run() { for (final ConnectionImpl connection : getValidConnections()) { connection.checkForHeartBeat(); } } }; /** * Underlying connection factory. */ private final ConnectionFactory factory; /** * The heartbeat scheduled future - which may be null if heartbeats are not * being sent (no valid connections). */ private ScheduledFuture<?> heartBeatFuture; /** * The heartbeat search request. */ private final SearchRequest heartBeatRequest; /** * The interval between successive heartbeats. */ private final long interval; private final TimeUnit intervalUnit; /** * Flag which indicates whether this factory has been closed. */ private final AtomicBoolean isClosed = new AtomicBoolean(); /** * The minimum amount of time the connection should remain idle (no * responses) before starting to send heartbeats. */ private final long minDelayMS; /** * The heartbeat scheduler. */ private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; /** * Scheduled task which sends heart beats for all valid idle connections. */ private final Runnable sendHeartBeatRunnable = new Runnable() { @Override public void run() { boolean heartBeatSent = false; for (final ConnectionImpl connection : getValidConnections()) { heartBeatSent |= connection.sendHeartBeat(); } if (heartBeatSent) { scheduler.get().schedule(checkHeartBeatRunnable, timeoutMS, TimeUnit.MILLISECONDS); } } }; /** * The heartbeat timeout in milli-seconds. The connection will be marked as * failed if no heartbeat response is received within the timeout. */ private final long timeoutMS; private final TimeUnit unit; private AtomicBoolean isClosed = new AtomicBoolean(); HeartBeatConnectionFactory(final ConnectionFactory factory) { this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null); } /** * List of valid connections to which heartbeats will be sent. */ private final List<ConnectionImpl> validConnections = new LinkedList<ConnectionImpl>(); HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit) { this(factory, interval, unit, DEFAULT_SEARCH, null); } HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit, final SearchRequest heartBeat) { this(factory, interval, unit, heartBeat, null); } HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit, final SearchRequest heartBeat, final long timeout, final TimeUnit unit, final SearchRequest heartBeat, final ScheduledExecutorService scheduler) { Validator.ensureNotNull(factory, heartBeat, unit); Validator.ensureTrue(interval >= 0, "negative timeout"); Validator.ensureNotNull(factory, unit); Validator.ensureTrue(interval >= 0, "negative interval"); Validator.ensureTrue(timeout >= 0, "negative timeout"); this.heartBeatRequest = heartBeat; this.heartBeatRequest = heartBeat != null ? heartBeat : DEFAULT_SEARCH; this.interval = interval; this.unit = unit; this.activeConnections = new LinkedList<ConnectionImpl>(); this.intervalUnit = unit; this.factory = factory; this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler); this.timeoutMS = unit.toMillis(interval) * 2; this.timeoutMS = unit.toMillis(timeout); this.minDelayMS = unit.toMillis(interval) / 2; } @Override public void close() { if (isClosed.compareAndSet(false, true)) { synchronized (activeConnections) { if (!activeConnections.isEmpty()) { synchronized (validConnections) { if (!validConnections.isEmpty()) { if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format( "HeartbeatConnectionFactory '%s' is closing while %d " + "active connections remain", toString(), activeConnections.size())); + "active connections remain", toString(), validConnections .size())); } } } @@ -944,23 +1198,41 @@ @Override public Connection getConnection() throws ErrorResultException { return adaptConnection(factory.getConnection()); /* * Immediately send a heart beat in order to determine if the connected * server is responding. */ final Connection connection = factory.getConnection(); boolean keepConnection = false; try { connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS, TimeUnit.MILLISECONDS); keepConnection = true; return adaptConnection(connection); } catch (final Exception e) { throw adaptHeartBeatError(e); } finally { if (!keepConnection) { connection.close(); } } } @Override public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> handler) { final FutureResultTransformer<Connection, Connection> future = new FutureResultTransformer<Connection, Connection>(handler) { @Override protected Connection transformResult(final Connection connection) throws ErrorResultException { return adaptConnection(connection); } }; // Create a future responsible for chaining the initial heartbeat search. final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler); future.setFutureResult(factory.getConnectionAsync(future)); return future; // Request a connection. final FutureResult<Connection> connectionFuture = factory.getConnectionAsync(compositeFuture.futureConnectionResult); // Set the connection future in the composite so that the returned search future can delegate. compositeFuture.futureConnectionResult.setFutureResult(connectionFuture); // Return the future representing the heartbeat. return compositeFuture.futureSearchResult; } @Override @@ -973,26 +1245,42 @@ } private Connection adaptConnection(final Connection connection) { final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection); synchronized (activeConnections) { connection.addConnectionEventListener(heartBeatConnection); if (activeConnections.isEmpty()) { synchronized (validConnections) { final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection); if (validConnections.isEmpty()) { /* This is the first active connection, so start the heart beat. */ heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() { @Override public void run() { final ConnectionImpl[] tmp; synchronized (activeConnections) { tmp = activeConnections.toArray(new ConnectionImpl[0]); } for (final ConnectionImpl connection : tmp) { connection.sendHeartBeat(); } } }, 0, interval, unit); heartBeatFuture = scheduler.get().scheduleWithFixedDelay(sendHeartBeatRunnable, 0, interval, intervalUnit); } activeConnections.add(heartBeatConnection); validConnections.add(heartBeatConnection); return heartBeatConnection; } return heartBeatConnection; } private ErrorResultException adaptHeartBeatError(final Exception error) { if (error instanceof ConnectionException) { return (ErrorResultException) error; } else if (error instanceof TimeoutResultException || error instanceof TimeoutException) { return newHeartBeatTimeoutError(); } else if (error instanceof InterruptedException) { return newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, error); } else { return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_FAILED.get(), error); } } private ConnectionImpl[] getValidConnections() { final ConnectionImpl[] tmp; synchronized (validConnections) { tmp = validConnections.toArray(new ConnectionImpl[0]); } return tmp; } private ErrorResultException newHeartBeatTimeoutError() { return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_TIMEOUT .get(timeoutMS)); } } opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
@@ -1418,3 +1418,7 @@ ERR_ENTRY_INCREMENT_CANNOT_PARSE_AS_INT=Unable to increment the value of \ attribute '%s' because either the current value or the increment could \ not be parsed as an integer HBCF_CONNECTION_CLOSED_BY_CLIENT=Connection closed by client HBCF_HEARTBEAT_FAILED=Heartbeat failed HBCF_HEARTBEAT_TIMEOUT=Heartbeat timed out after %d ms opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/ConnectionStateTest.java
New file @@ -0,0 +1,250 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS. */ package com.forgerock.opendj.ldap; import static org.fest.assertions.Assertions.assertThat; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import org.forgerock.opendj.ldap.ConnectionEventListener; import org.forgerock.opendj.ldap.ErrorResultException; import org.forgerock.opendj.ldap.ResultCode; import org.forgerock.opendj.ldap.responses.ExtendedResult; import org.testng.annotations.Test; /** * Tests for {@linkConnectionState}. */ @SuppressWarnings("javadoc") public class ConnectionStateTest extends LDAPTestCase { private static final ErrorResultException ERROR = newErrorResult(ResultCode.OTHER); private static final ErrorResultException LATE_ERROR = newErrorResult(ResultCode.BUSY); private static final ExtendedResult UNSOLICITED = newGenericExtendedResult(ResultCode.OPERATIONS_ERROR); @Test public void testCloseEventInClosedState() { final ConnectionState state = new ConnectionState(); final ConnectionEventListener listener1 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener1); state.notifyConnectionClosed(); state.notifyConnectionClosed(); assertThat(state.isValid()).isFalse(); assertThat(state.isClosed()).isTrue(); assertThat(state.getConnectionError()).isNull(); verify(listener1).handleConnectionClosed(); verifyNoMoreInteractions(listener1); // Listeners registered after event should be notified immediately. final ConnectionEventListener listener2 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener2); verify(listener2).handleConnectionClosed(); verifyNoMoreInteractions(listener2); } @Test public void testCloseEventInErrorState() { final ConnectionState state = new ConnectionState(); final ConnectionEventListener listener1 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener1); state.notifyConnectionError(false, ERROR); state.notifyConnectionClosed(); assertThat(state.isValid()).isFalse(); assertThat(state.isClosed()).isTrue(); assertThat(state.getConnectionError()).isSameAs(ERROR); verify(listener1).handleConnectionError(false, ERROR); verify(listener1).handleConnectionClosed(); verifyNoMoreInteractions(listener1); // Listeners registered after event should be notified immediately. final ConnectionEventListener listener2 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener2); verify(listener2).handleConnectionError(false, ERROR); verify(listener2).handleConnectionClosed(); verifyNoMoreInteractions(listener2); } @Test public void testCloseEventInValidState() { final ConnectionState state = new ConnectionState(); final ConnectionEventListener listener1 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener1); state.notifyConnectionClosed(); assertThat(state.isValid()).isFalse(); assertThat(state.isClosed()).isTrue(); assertThat(state.getConnectionError()).isNull(); verify(listener1).handleConnectionClosed(); verifyNoMoreInteractions(listener1); // Listeners registered after event should be notified immediately. final ConnectionEventListener listener2 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener2); verify(listener2).handleConnectionClosed(); verifyNoMoreInteractions(listener2); } @Test public void testErrorEventInClosedState() { final ConnectionState state = new ConnectionState(); final ConnectionEventListener listener1 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener1); state.notifyConnectionClosed(); state.notifyConnectionError(false, ERROR); assertThat(state.isValid()).isFalse(); assertThat(state.isClosed()).isTrue(); assertThat(state.getConnectionError()).isNull(); verify(listener1).handleConnectionClosed(); verifyNoMoreInteractions(listener1); // Listeners registered after event should be notified immediately. final ConnectionEventListener listener2 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener2); verify(listener2).handleConnectionClosed(); verifyNoMoreInteractions(listener2); } @Test public void testErrorEventInErrorState() { final ConnectionState state = new ConnectionState(); final ConnectionEventListener listener1 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener1); state.notifyConnectionError(false, ERROR); state.notifyConnectionError(false, LATE_ERROR); assertThat(state.isValid()).isFalse(); assertThat(state.isClosed()).isFalse(); assertThat(state.getConnectionError()).isSameAs(ERROR); verify(listener1).handleConnectionError(false, ERROR); verifyNoMoreInteractions(listener1); // Listeners registered after event should be notified immediately. final ConnectionEventListener listener2 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener2); assertThat(state.getConnectionError()).isSameAs(ERROR); verify(listener2).handleConnectionError(false, ERROR); verifyNoMoreInteractions(listener2); } @Test public void testErrorEventInValidState() { final ConnectionState state = new ConnectionState(); final ConnectionEventListener listener1 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener1); state.notifyConnectionError(false, ERROR); assertThat(state.isValid()).isFalse(); assertThat(state.isClosed()).isFalse(); assertThat(state.getConnectionError()).isSameAs(ERROR); verify(listener1).handleConnectionError(false, ERROR); verifyNoMoreInteractions(listener1); // Listeners registered after event should be notified immediately. final ConnectionEventListener listener2 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener2); verify(listener2).handleConnectionError(false, ERROR); verifyNoMoreInteractions(listener2); } @Test public void testRemoveEventListener() { final ConnectionState state = new ConnectionState(); final ConnectionEventListener listener = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener); state.notifyConnectionError(false, ERROR); verify(listener).handleConnectionError(false, ERROR); state.removeConnectionEventListener(listener); state.notifyConnectionClosed(); verifyNoMoreInteractions(listener); } @Test public void testUnsolicitedNotificationEventInClosedState() { final ConnectionState state = new ConnectionState(); final ConnectionEventListener listener1 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener1); state.notifyConnectionClosed(); state.notifyUnsolicitedNotification(UNSOLICITED); assertThat(state.isValid()).isFalse(); assertThat(state.isClosed()).isTrue(); assertThat(state.getConnectionError()).isNull(); verify(listener1).handleConnectionClosed(); verifyNoMoreInteractions(listener1); } @Test public void testUnsolicitedNotificationEventInErrorState() { final ConnectionState state = new ConnectionState(); final ConnectionEventListener listener1 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener1); state.notifyConnectionError(false, ERROR); state.notifyUnsolicitedNotification(UNSOLICITED); assertThat(state.isValid()).isFalse(); assertThat(state.isClosed()).isFalse(); assertThat(state.getConnectionError()).isSameAs(ERROR); verify(listener1).handleConnectionError(false, ERROR); verifyNoMoreInteractions(listener1); } @Test public void testUnsolicitedNotificationEventInValidState() { final ConnectionState state = new ConnectionState(); final ConnectionEventListener listener1 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener1); state.notifyUnsolicitedNotification(UNSOLICITED); assertThat(state.isValid()).isTrue(); assertThat(state.isClosed()).isFalse(); assertThat(state.getConnectionError()).isNull(); verify(listener1).handleUnsolicitedNotification(same(UNSOLICITED)); verifyNoMoreInteractions(listener1); /* * Listeners registered after event will not be notified (unsolicited * notifications are not cached). */ final ConnectionEventListener listener2 = mock(ConnectionEventListener.class); state.addConnectionEventListener(listener2); verifyNoMoreInteractions(listener2); } @Test public void testValidState() { final ConnectionState state = new ConnectionState(); assertThat(state.isValid()).isTrue(); assertThat(state.isClosed()).isFalse(); assertThat(state.getConnectionError()).isNull(); } } opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/FutureResultTransformerTestCase.java
New file @@ -0,0 +1,132 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2013 ForgeRock AS. */ package com.forgerock.opendj.util; import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Fail.fail; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import java.util.concurrent.TimeUnit; import org.forgerock.opendj.ldap.ErrorResultException; import org.forgerock.opendj.ldap.ResultCode; import org.forgerock.opendj.ldap.ResultHandler; import org.testng.annotations.Test; /** * Tests {@link FutureResultTransformer}. */ @SuppressWarnings("javadoc") public class FutureResultTransformerTestCase extends UtilTestCase { private static final class TestFuture extends FutureResultTransformer<Integer, String> { public TestFuture(final ResultHandler<? super String> handler) { super(handler); } @Override protected ErrorResultException transformErrorResult(final ErrorResultException error) { assertThat(error).isSameAs(UNTRANSFORMED_ERROR); return TRANSFORMED_ERROR; } @Override protected String transformResult(final Integer result) throws ErrorResultException { assertThat(result).isSameAs(UNTRANSFORMED_RESULT); return TRANSFORMED_RESULT; } } private static final ErrorResultException TRANSFORMED_ERROR = newErrorResult(ResultCode.OTHER, "transformed"); private static final String TRANSFORMED_RESULT = "transformed"; private static final ErrorResultException UNTRANSFORMED_ERROR = newErrorResult( ResultCode.OTHER, "untransformed"); private static final Integer UNTRANSFORMED_RESULT = Integer.valueOf(0); @Test public void testGetTransformsError() throws Exception { final TestFuture future = new TestFuture(null); future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_ERROR)); future.handleErrorResult(UNTRANSFORMED_ERROR); try { future.get(); fail(); } catch (final ErrorResultException e) { assertThat(e).isSameAs(TRANSFORMED_ERROR); } } @Test public void testGetTransformsResult() throws Exception { final TestFuture future = new TestFuture(null); future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_RESULT)); future.handleResult(UNTRANSFORMED_RESULT); assertThat(future.get()).isSameAs(TRANSFORMED_RESULT); } @Test public void testGetWithTimeoutTransformsError() throws Exception { final TestFuture future = new TestFuture(null); future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_ERROR)); future.handleErrorResult(UNTRANSFORMED_ERROR); try { future.get(100, TimeUnit.SECONDS); fail(); } catch (final ErrorResultException e) { assertThat(e).isSameAs(TRANSFORMED_ERROR); } } @Test public void testGetWithTimeoutTransformsResult() throws Exception { final TestFuture future = new TestFuture(null); future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_RESULT)); future.handleResult(UNTRANSFORMED_RESULT); assertThat(future.get(100, TimeUnit.SECONDS)).isSameAs(TRANSFORMED_RESULT); } @Test public void testResultHandlerTransformsError() throws Exception { @SuppressWarnings("unchecked") final ResultHandler<String> handler = mock(ResultHandler.class); final TestFuture future = new TestFuture(handler); future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_ERROR)); future.handleErrorResult(UNTRANSFORMED_ERROR); verify(handler).handleErrorResult(TRANSFORMED_ERROR); } @Test public void testResultHandlerTransformsResult() throws Exception { @SuppressWarnings("unchecked") final ResultHandler<String> handler = mock(ResultHandler.class); final TestFuture future = new TestFuture(handler); future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_RESULT)); future.handleResult(UNTRANSFORMED_RESULT); verify(handler).handleResult(TRANSFORMED_RESULT); } } opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
@@ -181,7 +181,7 @@ assertThat(scheduler.isScheduled()).isTrue(); // Forcefully run the monitor task and check that the first factory is online. scheduler.getCommand().run(); scheduler.runFirstTask(); verify(listener).handleConnectionFactoryOnline(firstAsync); verifyNoMoreInteractions(listener); } finally { opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -22,7 +22,7 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package org.forgerock.opendj.ldap; @@ -138,8 +138,8 @@ "objectclass=*", "cn"); factories[0][0] = new HeartBeatConnectionFactory(new LDAPConnectionFactory(getServerSocketAddress()), 1000, TimeUnit.MILLISECONDS, request); Connections.newHeartBeatConnectionFactory(new LDAPConnectionFactory( getServerSocketAddress()), 1000, 500, TimeUnit.MILLISECONDS, request); // InternalConnectionFactory factories[1][0] = Connections.newInternalConnectionFactory(LDAPServer.getInstance(), null); opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -44,7 +44,6 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.forgerock.opendj.ldap.requests.BindRequest; @@ -448,10 +447,7 @@ assertThat(scheduler.isScheduled()).isTrue(); // First populate the pool with idle connections at time 0. @SuppressWarnings("unchecked") final Callable<Long> timeSource = mock(Callable.class); when(timeSource.call()).thenReturn(0L); pool.testTimeSource = timeSource; pool.timeSource = mockTimeSource(0); assertThat(pool.currentPoolSize()).isEqualTo(0); Connection c1 = pool.getConnection(); @@ -466,13 +462,13 @@ assertThat(pool.currentPoolSize()).isEqualTo(4); // First purge at time 50 is no-op because no connections have expired. when(timeSource.call()).thenReturn(50L); scheduler.getCommand().run(); when(pool.timeSource.currentTimeMillis()).thenReturn(50L); scheduler.runFirstTask(); assertThat(pool.currentPoolSize()).isEqualTo(4); // Second purge at time 150 should remove 2 non-core connections. when(timeSource.call()).thenReturn(150L); scheduler.getCommand().run(); when(pool.timeSource.currentTimeMillis()).thenReturn(150L); scheduler.runFirstTask(); assertThat(pool.currentPoolSize()).isEqualTo(2); verify(pooledConnection1, times(1)).close(); @@ -481,7 +477,7 @@ verify(pooledConnection4, times(0)).close(); // Regrow the pool at time 200. when(timeSource.call()).thenReturn(200L); when(pool.timeSource.currentTimeMillis()).thenReturn(200L); Connection c5 = pool.getConnection(); // pooledConnection3 Connection c6 = pool.getConnection(); // pooledConnection4 Connection c7 = pool.getConnection(); // pooledConnection5 @@ -494,13 +490,13 @@ assertThat(pool.currentPoolSize()).isEqualTo(4); // Third purge at time 250 should not remove any connections. when(timeSource.call()).thenReturn(250L); scheduler.getCommand().run(); when(pool.timeSource.currentTimeMillis()).thenReturn(250L); scheduler.runFirstTask(); assertThat(pool.currentPoolSize()).isEqualTo(4); // Fourth purge at time 350 should remove 2 non-core connections. when(timeSource.call()).thenReturn(350L); scheduler.getCommand().run(); when(pool.timeSource.currentTimeMillis()).thenReturn(350L); scheduler.runFirstTask(); assertThat(pool.currentPoolSize()).isEqualTo(2); verify(pooledConnection3, times(1)).close(); opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
New file @@ -0,0 +1,410 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Fail.fail; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import static org.forgerock.opendj.ldap.SearchScope.BASE_OBJECT; import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnection; import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory; import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource; import static org.forgerock.opendj.ldap.requests.Requests.newModifyRequest; import static org.forgerock.opendj.ldap.requests.Requests.newSearchRequest; import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest; import static org.forgerock.opendj.ldap.responses.Responses.newResult; import static org.mockito.Matchers.any; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.forgerock.opendj.ldap.requests.BindRequest; import org.forgerock.opendj.ldap.requests.SearchRequest; import org.forgerock.opendj.ldap.responses.Result; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.forgerock.opendj.util.CompletedFutureResult; import com.forgerock.opendj.util.StaticUtils; /** * Tests the connection pool implementation.. */ @SuppressWarnings("javadoc") public class HeartBeatConnectionFactoryTestCase extends SdkTestCase { // @formatter:off /* * Key features which need testing: * * - lazy scheduler registration and deregistration * - scheduled task only sends heart-beat when connection is open * - connection remains valid when any response is received * - connection remains valid when a heart beat response is received * - connection becomes invalid if no response is received during timeout * - heart beat only sent when connection is idle * - slow bind / startTLS prevents heart beat * - slow heart beat prevents bind / start TLS * - support concurrent bind / start TLS */ // @formatter:on private static final SearchRequest HEARTBEAT = newSearchRequest("dc=test", BASE_OBJECT, "(objectclass=*)", "1.1"); private Connection connection; private ConnectionFactory factory; private Connection hbc; private HeartBeatConnectionFactory hbcf; private List<ConnectionEventListener> listeners; private MockScheduler scheduler; /** * Disables logging before the tests. */ @BeforeClass() public void disableLogging() { StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE); } /** * Re-enable logging after the tests. */ @AfterClass() public void enableLogging() { StaticUtils.DEBUG_LOG.setLevel(Level.INFO); } @AfterMethod(alwaysRun = true) public void tearDown() { try { if (hbc != null) { hbc.close(); assertThat(hbc.isValid()).isFalse(); assertThat(hbc.isClosed()).isTrue(); } scheduler.runAllTasks(); // Flush any remaining timeout tasks. assertThat(scheduler.isScheduled()).isFalse(); // No more connections to check. hbcf.close(); } finally { connection = null; factory = null; hbcf = null; listeners = null; scheduler = null; hbc = null; } } @SuppressWarnings("unchecked") @Test public void testBindWhileHeartBeatInProgress() throws Exception { // Mock connection with successful heartbeat. init(ResultCode.SUCCESS); // Get a connection. hbc = hbcf.getConnection(); /* * Send a heartbeat, trapping the search call-back so that we can send * the response once we have attempted a bind. */ when( connection.searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class), any(SearchResultHandler.class))) .thenReturn(null); when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L); scheduler.runAllTasks(); // Send the heartbeat. // Capture the heartbeat search result handler. final ArgumentCaptor<SearchResultHandler> arg = ArgumentCaptor.forClass(SearchResultHandler.class); verify(connection, times(2)).searchAsync(same(HEARTBEAT), any(IntermediateResponseHandler.class), arg.capture()); assertThat(hbc.isValid()).isTrue(); // Not checked yet. /* * Now attempt a bind request, which should be held in a queue until the * heart beat completes. */ hbc.bindAsync(newSimpleBindRequest(), null, null); verify(connection, times(0)).bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), any(ResultHandler.class)); // Send fake heartbeat response, releasing the bind request. arg.getValue().handleResult(newResult(ResultCode.SUCCESS)); verify(connection, times(1)).bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), any(ResultHandler.class)); } @Test public void testGetConnection() throws Exception { // Mock connection with successful initial heartbeat. init(ResultCode.SUCCESS); hbc = hbcf.getConnection(); assertThat(hbc).isNotNull(); assertThat(hbc.isValid()).isTrue(); } @Test public void testGetConnectionAsync() throws Exception { // Mock connection with successful initial heartbeat. init(ResultCode.SUCCESS); hbc = hbcf.getConnectionAsync(null).get(); assertThat(hbc).isNotNull(); assertThat(hbc.isValid()).isTrue(); } @Test public void testGetConnectionAsyncWithInitialHeartBeatError() throws Exception { // Mock connection with failing initial heartbeat. init(ResultCode.BUSY); try { hbcf.getConnectionAsync(null).get(); fail("Unexpectedly obtained a connection"); } catch (final ErrorResultException e) { checkInitialHeartBeatFailure(e); } } @Test public void testGetConnectionWithInitialHeartBeatError() throws Exception { // Mock connection with failing initial heartbeat. init(ResultCode.BUSY); try { hbcf.getConnection(); fail("Unexpectedly obtained a connection"); } catch (final ErrorResultException e) { checkInitialHeartBeatFailure(e); } } @Test public void testHeartBeatSucceedsThenFails() throws Exception { // Mock connection with successful heartbeat. init(ResultCode.SUCCESS); // Get a connection and check that it was pinged. hbc = hbcf.getConnection(); verifyHeartBeatSent(connection, 1); assertThat(scheduler.isScheduled()).isTrue(); // heartbeater assertThat(listeners).hasSize(1); assertThat(hbc.isValid()).isTrue(); // Invoke heartbeat before the connection is considered idle. scheduler.runAllTasks(); verifyHeartBeatSent(connection, 1); // No heartbeat sent - not idle yet. assertThat(hbc.isValid()).isTrue(); // Invoke heartbeat after the connection is considered idle. when(hbcf.timeSource.currentTimeMillis()).thenReturn(6000L); scheduler.runAllTasks(); verifyHeartBeatSent(connection, 2); // Heartbeat sent. assertThat(hbc.isValid()).isTrue(); // Now force the heartbeat to fail. mockHeartBeatResponse(connection, listeners, ResultCode.CLIENT_SIDE_SERVER_DOWN); when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L); scheduler.runAllTasks(); verifyHeartBeatSent(connection, 3); assertThat(hbc.isValid()).isFalse(); // Flush redundant timeout tasks. scheduler.runAllTasks(); // Attempt to send a new request: it should fail immediately. @SuppressWarnings("unchecked") final ResultHandler<Result> mockHandler = mock(ResultHandler.class); hbc.modifyAsync(newModifyRequest(DN.rootDN()), null, mockHandler); final ArgumentCaptor<ErrorResultException> arg = ArgumentCaptor.forClass(ErrorResultException.class); verify(mockHandler).handleErrorResult(arg.capture()); assertThat(arg.getValue().getResult().getResultCode()).isEqualTo( ResultCode.CLIENT_SIDE_SERVER_DOWN); assertThat(hbc.isValid()).isFalse(); assertThat(hbc.isClosed()).isFalse(); } @Test public void testHeartBeatTimeout() throws Exception { // Mock connection with successful heartbeat. init(ResultCode.SUCCESS); // Get a connection and check that it was pinged. hbc = hbcf.getConnection(); // Now force the heartbeat to fail due to timeout. mockHeartBeatResponse(connection, listeners, null /* no response */); when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L); scheduler.runAllTasks(); // Send the heartbeat. verifyHeartBeatSent(connection, 2); assertThat(hbc.isValid()).isTrue(); // Not checked yet. when(hbcf.timeSource.currentTimeMillis()).thenReturn(12000L); scheduler.runAllTasks(); // Check for heartbeat. assertThat(hbc.isValid()).isFalse(); // Now invalid. assertThat(hbc.isClosed()).isFalse(); } @SuppressWarnings("unchecked") @Test public void testHeartBeatWhileBindInProgress() throws Exception { // Mock connection with successful heartbeat. init(ResultCode.SUCCESS); // Get a connection. hbc = hbcf.getConnection(); /* * Send a bind request, trapping the bind call-back so that we can send * the response once we have attempted a heartbeat. */ when( connection.bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), any(ResultHandler.class))) .thenReturn(null); hbc.bindAsync(newSimpleBindRequest(), null, null); // Capture the bind result handler. @SuppressWarnings("rawtypes") final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class); verify(connection, times(1)).bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), arg.capture()); /* * Now attempt the heartbeat which should not happen because there is a * bind in progress. */ when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L); scheduler.runAllTasks(); // Attempt to send the heartbeat. verify(connection, times(1)).searchAsync(same(HEARTBEAT), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); // Send fake bind response, releasing the heartbeat. arg.getValue().handleResult(newResult(ResultCode.SUCCESS)); // Attempt to send a heartbeat again. when(hbcf.timeSource.currentTimeMillis()).thenReturn(16000L); scheduler.runAllTasks(); // Attempt to send the heartbeat. verify(connection, times(2)).searchAsync(same(HEARTBEAT), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); } @Test public void testToString() { init(ResultCode.SUCCESS); assertThat(hbcf.toString()).isNotNull(); } private void checkInitialHeartBeatFailure(final ErrorResultException e) { /* * Initial heartbeat failure should trigger connection exception with * heartbeat cause. */ assertThat(e).isInstanceOf(ConnectionException.class); assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_SERVER_DOWN); assertThat(e.getCause()).isInstanceOf(ErrorResultException.class); assertThat(((ErrorResultException) e.getCause()).getResult().getResultCode()).isEqualTo( ResultCode.BUSY); } private void init(final ResultCode initialHeartBeatResult) { // Mock connection with successful heartbeat. listeners = new LinkedList<ConnectionEventListener>(); connection = mockConnection(listeners); when(connection.isValid()).thenReturn(true); mockHeartBeatResponse(connection, listeners, initialHeartBeatResult); // Underlying connection factory. factory = mockConnectionFactory(connection); // Create heart beat connection factory. scheduler = new MockScheduler(); hbcf = new HeartBeatConnectionFactory(factory, 10000, 100, TimeUnit.MILLISECONDS, HEARTBEAT, scheduler); // Set initial time stamp. hbcf.timeSource = mockTimeSource(0); } private Connection mockHeartBeatResponse(final Connection mockConnection, final List<ConnectionEventListener> listeners, final ResultCode resultCode) { doAnswer(new Answer<FutureResult<Result>>() { @Override public FutureResult<Result> answer(final InvocationOnMock invocation) throws Throwable { if (resultCode == null) { return null; } final SearchResultHandler handler = (SearchResultHandler) invocation.getArguments()[2]; if (resultCode.isExceptional()) { final ErrorResultException error = newErrorResult(resultCode); if (handler != null) { handler.handleErrorResult(error); } if (error instanceof ConnectionException) { for (final ConnectionEventListener listener : listeners) { listener.handleConnectionError(false, error); } } return new CompletedFutureResult<Result>(error); } else { final Result result = newResult(resultCode); if (handler != null) { handler.handleResult(result); } return new CompletedFutureResult<Result>(result); } } }).when(mockConnection).searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); return mockConnection; } private void verifyHeartBeatSent(final Connection connection, final int times) { verify(connection, times(times)).searchAsync(same(HEARTBEAT), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); } } opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java
@@ -25,10 +25,14 @@ */ package org.forgerock.opendj.ldap; import static java.util.concurrent.Executors.callable; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -43,11 +47,74 @@ */ final class MockScheduler implements ScheduledExecutorService { // Saved scheduled task. private Runnable command; private long delay; private boolean isScheduled = false; private TimeUnit unit; private final class ScheduledCallableFuture<T> implements ScheduledFuture<T>, Callable<T> { private final Callable<T> callable; private final CountDownLatch isDone = new CountDownLatch(1); private final boolean removeAfterCall; private T result = null; private ScheduledCallableFuture(final Callable<T> callable, final boolean removeAfterCall) { this.callable = callable; this.removeAfterCall = removeAfterCall; } @Override public T call() throws Exception { result = callable.call(); isDone.countDown(); if (removeAfterCall) { tasks.remove(this); } return result; } @Override public boolean cancel(final boolean mayInterruptIfRunning) { return tasks.remove(this); } @Override public int compareTo(final Delayed o) { // Unused. return 0; } @Override public T get() throws InterruptedException, ExecutionException { isDone.await(); return result; } @Override public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (isDone.await(timeout, unit)) { return result; } else { throw new TimeoutException(); } } @Override public long getDelay(final TimeUnit unit) { // Unused. return 0; } @Override public boolean isCancelled() { return tasks.contains(this); } @Override public boolean isDone() { return isDone.getCount() == 0; } } // Saved scheduled tasks. private final List<Callable<?>> tasks = new CopyOnWriteArrayList<Callable<?>>(); MockScheduler() { // Nothing to do. @@ -109,74 +176,24 @@ @Override public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) { // Unused. return null; return onceOnly(callable); } @Override public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { // Unused. return null; return onceOnly(callable(command)); } @Override public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { // Unused. return null; return repeated(callable(command)); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { this.command = command; this.delay = delay; this.unit = unit; this.isScheduled = true; return new ScheduledFuture<Object>() { @Override public boolean cancel(final boolean mayInterruptIfRunning) { isScheduled = false; return true; } @Override public int compareTo(final Delayed o) { // Unused. return 0; } @Override public Object get() throws InterruptedException, ExecutionException { // Unused. return null; } @Override public Object get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // Unused. return null; } @Override public long getDelay(final TimeUnit unit) { // Unused. return 0; } @Override public boolean isCancelled() { return !isScheduled; } @Override public boolean isDone() { // Unused. return false; } }; return repeated(callable(command)); } @Override @@ -192,35 +209,62 @@ @Override public <T> Future<T> submit(final Callable<T> task) { // Unused. return null; return onceOnly(task); } @Override public Future<?> submit(final Runnable task) { // Unused. return null; return onceOnly(callable(task)); } @Override public <T> Future<T> submit(final Runnable task, final T result) { // Unused. return null; return onceOnly(callable(task, result)); } Runnable getCommand() { return command; List<Callable<?>> getAllTasks() { return tasks; } long getDelay() { return delay; } TimeUnit getUnit() { return unit; Callable<?> getFirstTask() { return tasks.get(0); } boolean isScheduled() { return isScheduled; return !tasks.isEmpty(); } void runAllTasks() { for (final Callable<?> task : tasks) { runTask0(task); } } void runFirstTask() { runTask(0); } void runTask(final int i) { runTask0(tasks.get(i)); } private <T> ScheduledCallableFuture<T> onceOnly(final Callable<T> callable) { final ScheduledCallableFuture<T> wrapped = new ScheduledCallableFuture<T>(callable, true); tasks.add(wrapped); return wrapped; } private <T> ScheduledCallableFuture<T> repeated(final Callable<T> callable) { final ScheduledCallableFuture<T> wrapped = new ScheduledCallableFuture<T>(callable, false); tasks.add(wrapped); return wrapped; } private void runTask0(final Callable<?> task) { try { task.call(); } catch (final Exception e) { throw new RuntimeException(e); } } } opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java
@@ -33,7 +33,6 @@ * An abstract class that all types unit tests should extend. A type represents * the classes found directly under the package org.forgerock.opendj.ldap. */ @Test(groups = { "precommit", "types", "sdk" }) public abstract class SdkTestCase extends ForgeRockTestCase { } opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java
@@ -22,7 +22,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2012 ForgeRock AS. * Portions copyright 2012-2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; @@ -42,8 +42,10 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.mockito.stubbing.OngoingStubbing; import com.forgerock.opendj.util.CompletedFutureResult; import com.forgerock.opendj.util.TimeSource; /** * This class defines some utility functions which can be used by test cases. @@ -202,4 +204,22 @@ return mockConnection; } /** * Returns a mock {@link TimeSource} which can be used for injecting fake * time stamps into components. * * @param times * The times in milli-seconds which should be returned by the * time source. * @return The mock time source. */ public static TimeSource mockTimeSource(final long... times) { final TimeSource mock = mock(TimeSource.class); OngoingStubbing<Long> stubbing = when(mock.currentTimeMillis()); for (long t : times) { stubbing = stubbing.thenReturn(t); } return mock; } } opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtilsTestCase.java
New file @@ -0,0 +1,53 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.forgerock.opendj.ldap; import static org.fest.assertions.Assertions.assertThat; import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource; import org.testng.annotations.Test; import com.forgerock.opendj.util.TimeSource; @SuppressWarnings("javadoc") public class TestCaseUtilsTestCase extends SdkTestCase { /** * Test for {@link #mockTimeSource(long...)}. */ @Test public void testMockTimeSource() { final TimeSource mock1 = mockTimeSource(10); assertThat(mock1.currentTimeMillis()).isEqualTo(10); assertThat(mock1.currentTimeMillis()).isEqualTo(10); final TimeSource mock2 = mockTimeSource(10, 20, 30); assertThat(mock2.currentTimeMillis()).isEqualTo(10); assertThat(mock2.currentTimeMillis()).isEqualTo(20); assertThat(mock2.currentTimeMillis()).isEqualTo(30); assertThat(mock2.currentTimeMillis()).isEqualTo(30); } } opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json
@@ -35,8 +35,10 @@ // Re-usable pool of 24 connections per server. "connectionPoolSize" : 24, // Check pooled connections are alive every 30 seconds. "heartBeatIntervalSeconds" : 30, // Check pooled connections are alive every 30 seconds with a 500ms // heart beat timeout. "heartBeatIntervalSeconds" : 30, "heartBeatTimeoutMilliSeconds" : 500, // The preferred load-balancing pool. "primaryLDAPServers" : [ opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
@@ -970,6 +970,9 @@ Math.max(configuration.get("connectionPoolSize").defaultTo(10).asInteger(), 1); final int heartBeatIntervalSeconds = Math.max(configuration.get("heartBeatIntervalSeconds").defaultTo(30).asInteger(), 1); final int heartBeatTimeoutMilliSeconds = Math.max(configuration.get("heartBeatTimeoutMilliSeconds").defaultTo(500) .asInteger(), 100); // Parse authentication parameters. final BindRequest bindRequest; @@ -1036,7 +1039,7 @@ } final ConnectionFactory primary = parseLDAPServers(primaryLDAPServers, bindRequest, connectionPoolSize, heartBeatIntervalSeconds, options); heartBeatIntervalSeconds, heartBeatTimeoutMilliSeconds, options); // Parse secondary data center(s). final JsonValue secondaryLDAPServers = configuration.get("secondaryLDAPServers"); @@ -1045,7 +1048,7 @@ if (secondaryLDAPServers.size() > 0) { secondary = parseLDAPServers(secondaryLDAPServers, bindRequest, connectionPoolSize, heartBeatIntervalSeconds, options); heartBeatIntervalSeconds, heartBeatTimeoutMilliSeconds, options); } else { secondary = null; } @@ -1093,20 +1096,22 @@ private static ConnectionFactory parseLDAPServers(final JsonValue config, final BindRequest bindRequest, final int connectionPoolSize, final int heartBeatIntervalSeconds, final LDAPOptions options) { final int heartBeatIntervalSeconds, final int heartBeatTimeoutMilliSeconds, final LDAPOptions options) { final List<ConnectionFactory> servers = new ArrayList<ConnectionFactory>(config.size()); for (final JsonValue server : config) { final String host = server.get("hostname").required().asString(); final int port = server.get("port").required().asInteger(); ConnectionFactory factory = new LDAPConnectionFactory(host, port, options); factory = Connections.newHeartBeatConnectionFactory(factory, heartBeatIntervalSeconds * 1000, heartBeatTimeoutMilliSeconds, TimeUnit.MILLISECONDS); if (bindRequest != null) { factory = Connections.newAuthenticatedConnectionFactory(factory, bindRequest); } if (connectionPoolSize > 1) { factory = Connections.newHeartBeatConnectionFactory(factory, heartBeatIntervalSeconds, TimeUnit.SECONDS); factory = Connections.newCachedConnectionPool(factory, 0, connectionPoolSize, 60L, TimeUnit.SECONDS); }