From 3e1db08dce649f6d426aae67d506c8c23b15f6e8 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 18 Sep 2013 16:05:59 +0000
Subject: [PATCH] Fix OPENDJ-1058 – HeartbeatConnectionFactory does not actively shutdown dead connections
---
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java | 192 ++-
opendj3/opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json | 6
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java | 18
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java | 2
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java | 6
opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/ConnectionStateTest.java | 250 ++++
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java | 6
opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java | 17
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java | 22
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java | 1342 +++++++++++++++----------
opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/FutureResultTransformerTestCase.java | 132 ++
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/TimeSource.java | 52 +
opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties | 4
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java | 31
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java | 427 +++++--
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java | 31
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java | 24
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtilsTestCase.java | 53 +
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/ConnectionState.java | 387 +++++++
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java | 1
20 files changed, 2,198 insertions(+), 805 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/ConnectionState.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/ConnectionState.java
new file mode 100644
index 0000000..5d4190a
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/ConnectionState.java
@@ -0,0 +1,387 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS.
+ */
+package com.forgerock.opendj.ldap;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.forgerock.opendj.ldap.ConnectionEventListener;
+import org.forgerock.opendj.ldap.ErrorResultException;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+
+/**
+ * This class can be used to manage the internal state of a connection, ensuring
+ * valid and atomic state transitions, as well as connection event listener
+ * notification. There are 4 states:
+ * <ul>
+ * <li>connection is <b>valid</b> (isClosed()=false, isFailed()=false): can fail
+ * or be closed
+ * <li>connection has failed due to an <b>error</b> (isClosed()=false,
+ * isFailed()=true): can be closed
+ * <li>connection has been <b>closed</b> by the application (isClosed()=true,
+ * isFailed()=false): terminal state
+ * <li>connection has failed due to an <b>error</b> and has been <b>closed</b>
+ * by the application (isClosed()=true, isFailed()=true): terminal state
+ * </ul>
+ * All methods are synchronized and container classes may also synchronize on
+ * the state where needed. The state transition methods,
+ * {@link #notifyConnectionClosed()} and
+ * {@link #notifyConnectionError(boolean, ErrorResultException)}, correspond to
+ * methods in the {@link ConnectionEventListener} interface except that they
+ * return a boolean indicating whether the transition was successful or not.
+ */
+public final class ConnectionState {
+ /*
+ * FIXME: The synchronization in this class has been kept simple for now.
+ * However, ideally we should notify listeners without synchronizing on the
+ * state in case a listener takes a long time to complete.
+ */
+
+ /*
+ * FIXME: This class should be used by connection pool and ldap connection
+ * implementations as well.
+ */
+
+ /**
+ * Use the State design pattern to manage state transitions.
+ */
+ private enum State {
+
+ /**
+ * Connection has not encountered an error nor has it been closed
+ * (initial state).
+ */
+ VALID() {
+ @Override
+ void addConnectionEventListener(final ConnectionState cs,
+ final ConnectionEventListener listener) {
+ cs.listeners.add(listener);
+ }
+
+ @Override
+ boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ boolean isFailed() {
+ return false;
+ }
+
+ @Override
+ boolean isValid() {
+ return true;
+ }
+
+ @Override
+ boolean notifyConnectionClosed(final ConnectionState cs) {
+ for (final ConnectionEventListener listener : cs.listeners) {
+ listener.handleConnectionClosed();
+ }
+ cs.state = CLOSED;
+ return true;
+ }
+
+ @Override
+ boolean notifyConnectionError(final ConnectionState cs,
+ final boolean isDisconnectNotification, final ErrorResultException error) {
+ // Transition from valid to error state.
+ cs.failedDueToDisconnect = isDisconnectNotification;
+ cs.connectionError = error;
+ for (final ConnectionEventListener listener : cs.listeners) {
+ // Use the reason provided in the disconnect notification.
+ listener.handleConnectionError(isDisconnectNotification, error);
+ }
+ cs.state = ERROR;
+ return true;
+ }
+
+ @Override
+ void notifyUnsolicitedNotification(final ConnectionState cs,
+ final ExtendedResult notification) {
+ for (final ConnectionEventListener listener : cs.listeners) {
+ listener.handleUnsolicitedNotification(notification);
+ }
+ }
+ },
+
+ /**
+ * Connection has encountered an error, but has not been closed.
+ */
+ ERROR() {
+ @Override
+ void addConnectionEventListener(final ConnectionState cs,
+ final ConnectionEventListener listener) {
+ listener.handleConnectionError(cs.failedDueToDisconnect, cs.connectionError);
+ cs.listeners.add(listener);
+ }
+
+ @Override
+ boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ boolean isFailed() {
+ return true;
+ }
+
+ @Override
+ boolean isValid() {
+ return false;
+ }
+
+ @Override
+ boolean notifyConnectionClosed(final ConnectionState cs) {
+ for (final ConnectionEventListener listener : cs.listeners) {
+ listener.handleConnectionClosed();
+ }
+ cs.state = ERROR_CLOSED;
+ return true;
+ }
+ },
+
+ /**
+ * Connection has been closed (terminal state).
+ */
+ CLOSED() {
+ @Override
+ void addConnectionEventListener(final ConnectionState cs,
+ final ConnectionEventListener listener) {
+ listener.handleConnectionClosed();
+ }
+
+ @Override
+ boolean isClosed() {
+ return true;
+ }
+
+ @Override
+ boolean isFailed() {
+ return false;
+ }
+
+ @Override
+ boolean isValid() {
+ return false;
+ }
+ },
+
+ /**
+ * Connection has encountered an error and has been closed (terminal
+ * state).
+ */
+ ERROR_CLOSED() {
+ @Override
+ void addConnectionEventListener(final ConnectionState cs,
+ final ConnectionEventListener listener) {
+ listener.handleConnectionError(cs.failedDueToDisconnect, cs.connectionError);
+ listener.handleConnectionClosed();
+ }
+
+ @Override
+ boolean isClosed() {
+ return true;
+ }
+
+ @Override
+ boolean isFailed() {
+ return true;
+ }
+
+ @Override
+ boolean isValid() {
+ return false;
+ }
+ };
+
+ abstract void addConnectionEventListener(ConnectionState cs,
+ final ConnectionEventListener listener);
+
+ abstract boolean isClosed();
+
+ abstract boolean isFailed();
+
+ abstract boolean isValid();
+
+ boolean notifyConnectionClosed(final ConnectionState cs) {
+ return false;
+ }
+
+ boolean notifyConnectionError(final ConnectionState cs,
+ final boolean isDisconnectNotification, final ErrorResultException error) {
+ return false;
+ }
+
+ void notifyUnsolicitedNotification(final ConnectionState cs,
+ final ExtendedResult notification) {
+ // Do nothing by default.
+ }
+ }
+
+ /**
+ * Non-{@code null} once the connection has failed due to a connection
+ * error. Volatile so that it can be read without synchronization.
+ */
+ private volatile ErrorResultException connectionError = null;
+
+ /**
+ * {@code true} if the connection has failed due to a disconnect
+ * notification.
+ */
+ private boolean failedDueToDisconnect = false;
+
+ /**
+ * Registered event listeners.
+ */
+ private final List<ConnectionEventListener> listeners =
+ new LinkedList<ConnectionEventListener>();
+
+ /**
+ * Internal state implementation.
+ */
+ private volatile State state = State.VALID;
+
+ /**
+ * Creates a new connection state which is initially valid.
+ */
+ public ConnectionState() {
+ // Nothing to do.
+ }
+
+ /**
+ * Registers the provided connection event listener so that it will be
+ * notified when this connection is closed by the application, receives an
+ * unsolicited notification, or experiences a fatal error.
+ *
+ * @param listener
+ * The listener which wants to be notified when events occur on
+ * this connection.
+ * @throws IllegalStateException
+ * If this connection has already been closed, i.e. if
+ * {@code isClosed() == true}.
+ * @throws NullPointerException
+ * If the {@code listener} was {@code null}.
+ */
+ public synchronized void addConnectionEventListener(final ConnectionEventListener listener) {
+ state.addConnectionEventListener(this, listener);
+ }
+
+ /**
+ * Returns the error that caused the connection to fail, or {@code null} if
+ * the connection has not failed.
+ *
+ * @return The error that caused the connection to fail, or {@code null} if
+ * the connection has not failed.
+ */
+ public ErrorResultException getConnectionError() {
+ return connectionError;
+ }
+
+ /**
+ * Indicates whether or not this connection has been explicitly closed by
+ * calling {@code close}. This method will not return {@code true} if a
+ * fatal error has occurred on the connection unless {@code close} has been
+ * called.
+ *
+ * @return {@code true} if this connection has been explicitly closed by
+ * calling {@code close}, or {@code false} otherwise.
+ */
+ public boolean isClosed() {
+ return state.isClosed();
+ }
+
+ /**
+ * Returns {@code true} if the associated connection has not been closed and
+ * no fatal errors have been detected.
+ *
+ * @return {@code true} if this connection is valid, {@code false}
+ * otherwise.
+ */
+ public boolean isValid() {
+ return state.isValid();
+ }
+
+ /**
+ * Attempts to transition this connection state to closed and invokes event
+ * listeners if successful.
+ *
+ * @return {@code true} if the state changed to closed, or {@code false} if
+ * the state was already closed.
+ * @see ConnectionEventListener#handleConnectionClosed()
+ */
+ public synchronized boolean notifyConnectionClosed() {
+ return state.notifyConnectionClosed(this);
+ }
+
+ /**
+ * Attempts to transition this connection state to error and invokes event
+ * listeners if successful.
+ *
+ * @param isDisconnectNotification
+ * {@code true} if the error was triggered by a disconnect
+ * notification sent by the server, otherwise {@code false}.
+ * @param error
+ * The exception that is about to be thrown to the application.
+ * @return {@code true} if the state changed to error, or {@code false} if
+ * the state was already error or closed.
+ * @see ConnectionEventListener#handleConnectionError(boolean,
+ * ErrorResultException)
+ */
+ public synchronized boolean notifyConnectionError(final boolean isDisconnectNotification,
+ final ErrorResultException error) {
+ return state.notifyConnectionError(this, isDisconnectNotification, error);
+ }
+
+ /**
+ * Notifies event listeners of the provided unsolicited notification if the
+ * state is valid.
+ *
+ * @param notification
+ * The unsolicited notification.
+ * @see ConnectionEventListener#handleUnsolicitedNotification(ExtendedResult)
+ */
+ public synchronized void notifyUnsolicitedNotification(final ExtendedResult notification) {
+ state.notifyUnsolicitedNotification(this, notification);
+ }
+
+ /**
+ * Removes the provided connection event listener from this connection so
+ * that it will no longer be notified when this connection is closed by the
+ * application, receives an unsolicited notification, or experiences a fatal
+ * error.
+ *
+ * @param listener
+ * The listener which no longer wants to be notified when events
+ * occur on this connection.
+ * @throws NullPointerException
+ * If the {@code listener} was {@code null}.
+ */
+ public synchronized void removeConnectionEventListener(final ConnectionEventListener listener) {
+ listeners.remove(listener);
+ }
+
+}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java
index c68229d..83914d4 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java
@@ -22,6 +22,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package com.forgerock.opendj.util;
@@ -44,15 +45,12 @@
* The type of the outer result.
*/
public abstract class FutureResultTransformer<M, N> implements FutureResult<N>, ResultHandler<M> {
-
private final ResultHandler<? super N> handler;
-
private volatile FutureResult<? extends M> future = null;
// These do not need to be volatile since the future acts as a memory
// barrier.
private N transformedResult = null;
-
private ErrorResultException transformedErrorResult = null;
/**
@@ -77,8 +75,11 @@
* {@inheritDoc}
*/
public final N get() throws ErrorResultException, InterruptedException {
- future.get();
-
+ try {
+ future.get();
+ } catch (ErrorResultException ignored) {
+ // Ignore untransformed error.
+ }
// The handlers are guaranteed to have been invoked at this point.
return get0();
}
@@ -88,8 +89,11 @@
*/
public final N get(final long timeout, final TimeUnit unit) throws ErrorResultException,
TimeoutException, InterruptedException {
- future.get(timeout, unit);
-
+ try {
+ future.get(timeout, unit);
+ } catch (ErrorResultException ignored) {
+ // Ignore untransformed error.
+ }
// The handlers are guaranteed to have been invoked at this point.
return get0();
}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/TimeSource.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/TimeSource.java
new file mode 100644
index 0000000..4c9a368
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/TimeSource.java
@@ -0,0 +1,52 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS.
+ */
+
+package com.forgerock.opendj.util;
+
+/**
+ * An interface which is used by various components in order to obtain the
+ * current time.
+ */
+public interface TimeSource {
+ /**
+ * Default implementation which delegates to
+ * {@link System#currentTimeMillis()}.
+ */
+ public static final TimeSource DEFAULT = new TimeSource() {
+
+ @Override
+ public long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+ };
+
+ /**
+ * Returns the current time in milli-seconds.
+ *
+ * @return The current time in milli-seconds.
+ */
+ long currentTimeMillis();
+}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
index 48511a7..c2176e9 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -35,7 +35,6 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -67,6 +66,7 @@
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.CompletedFutureResult;
import com.forgerock.opendj.util.ReferenceCountedObject;
+import com.forgerock.opendj.util.TimeSource;
import com.forgerock.opendj.util.Validator;
/**
@@ -552,7 +552,7 @@
* since we don't want to hold the lock too long.
*/
idleConnections = new LinkedList<Connection>();
- final long timeoutMillis = currentTimeMillis() - idleTimeoutMillis;
+ final long timeoutMillis = timeSource.currentTimeMillis() - idleTimeoutMillis;
int nonCoreConnectionCount = currentPoolSize() - corePoolSize;
for (QueueElement holder = queue.peek(); nonCoreConnectionCount > 0
&& isTimedOutQueuedConnection(holder, timeoutMillis); holder = queue.peek()) {
@@ -631,10 +631,10 @@
}
/**
- * This is intended for unit testing only in order to inject fake time
- * stamps. Use System.currentTimeMillis() when null.
+ * This is package private in order to allow unit tests to inject fake time
+ * stamps.
*/
- Callable<Long> testTimeSource = null;
+ TimeSource timeSource = TimeSource.DEFAULT;
private final Semaphore availableConnections;
private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
@@ -740,7 +740,7 @@
} else if (hasWaitingConnections()) {
holder = queue.removeFirst();
} else {
- holder = new QueueElement(handler, currentTimeMillis());
+ holder = new QueueElement(handler, timeSource.currentTimeMillis());
queue.add(holder);
}
}
@@ -802,23 +802,6 @@
return maxPoolSize - availableConnections.availablePermits();
}
- /*
- * This method delegates to System.currentTimeMillis() except in unit tests
- * where we use injected times.
- */
- private long currentTimeMillis() {
- if (testTimeSource == null) {
- return System.currentTimeMillis();
- } else {
- try {
- return testTimeSource.call();
- } catch (final Exception e) {
- // Should not happen.
- throw new RuntimeException(e);
- }
- }
- }
-
private boolean hasWaitingConnections() {
return !queue.isEmpty() && !queue.getFirst().isWaitingFuture();
}
@@ -839,7 +822,7 @@
connectionPoolIsClosing = true;
holder = null;
} else {
- holder = new QueueElement(connection, currentTimeMillis());
+ holder = new QueueElement(connection, timeSource.currentTimeMillis());
queue.add(holder);
return;
}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
index 985c887..3b0b6df 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -245,7 +245,8 @@
* Creates a new heart-beat connection factory which will create connections
* using the provided connection factory and periodically ping any created
* connections in order to detect that they are still alive every 10 seconds
- * using the default scheduler.
+ * using the default scheduler. Connections will be marked as having failed
+ * if a heart-beat takes longer than 500ms.
*
* @param factory
* The connection factory to use for creating connections.
@@ -254,7 +255,8 @@
* If {@code factory} was {@code null}.
*/
public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory) {
- return new HeartBeatConnectionFactory(factory);
+ return new HeartBeatConnectionFactory(factory, 10000, 500, TimeUnit.MILLISECONDS, null,
+ null);
}
/**
@@ -267,6 +269,9 @@
* The connection factory to use for creating connections.
* @param interval
* The interval between keepalive pings.
+ * @param timeout
+ * The heart-beat timeout after which a connection will be marked
+ * as failed.
* @param unit
* The time unit for the interval between keepalive pings.
* @return The new heart-beat connection factory.
@@ -276,8 +281,8 @@
* If {@code factory} or {@code unit} was {@code null}.
*/
public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory,
- final long interval, final TimeUnit unit) {
- return new HeartBeatConnectionFactory(factory, interval, unit);
+ final long interval, final long timeout, final TimeUnit unit) {
+ return new HeartBeatConnectionFactory(factory, interval, timeout, unit, null, null);
}
/**
@@ -290,6 +295,9 @@
* The connection factory to use for creating connections.
* @param interval
* The interval between keepalive pings.
+ * @param timeout
+ * The heart-beat timeout after which a connection will be marked
+ * as failed.
* @param unit
* The time unit for the interval between keepalive pings.
* @param heartBeat
@@ -302,8 +310,9 @@
* {@code null}.
*/
public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory,
- final long interval, final TimeUnit unit, final SearchRequest heartBeat) {
- return new HeartBeatConnectionFactory(factory, interval, unit, heartBeat);
+ final long interval, final long timeout, final TimeUnit unit,
+ final SearchRequest heartBeat) {
+ return new HeartBeatConnectionFactory(factory, interval, timeout, unit, heartBeat, null);
}
/**
@@ -316,6 +325,9 @@
* The connection factory to use for creating connections.
* @param interval
* The interval between keepalive pings.
+ * @param timeout
+ * The heart-beat timeout after which a connection will be marked
+ * as failed.
* @param unit
* The time unit for the interval between keepalive pings.
* @param heartBeat
@@ -331,9 +343,10 @@
* {@code null}.
*/
public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory,
- final long interval, final TimeUnit unit, final SearchRequest heartBeat,
- final ScheduledExecutorService scheduler) {
- return new HeartBeatConnectionFactory(factory, interval, unit, heartBeat, scheduler);
+ final long interval, final long timeout, final TimeUnit unit,
+ final SearchRequest heartBeat, final ScheduledExecutorService scheduler) {
+ return new HeartBeatConnectionFactory(factory, interval, timeout, unit, heartBeat,
+ scheduler);
}
/**
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java
index e9cf922..45949d3 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java
@@ -72,7 +72,7 @@
* If {@code resultCode} was {@code null}.
*/
public static ErrorResultException newErrorResult(ResultCode resultCode,
- String diagnosticMessage) {
+ CharSequence diagnosticMessage) {
return newErrorResult(resultCode, diagnosticMessage, null);
}
@@ -114,10 +114,10 @@
* If {@code resultCode} was {@code null}.
*/
public static ErrorResultException newErrorResult(ResultCode resultCode,
- String diagnosticMessage, Throwable cause) {
+ CharSequence diagnosticMessage, Throwable cause) {
final Result result = Responses.newResult(resultCode);
if (diagnosticMessage != null) {
- result.setDiagnosticMessage(diagnosticMessage);
+ result.setDiagnosticMessage(diagnosticMessage.toString());
} else if (cause != null) {
result.setDiagnosticMessage(cause.getLocalizedMessage());
}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 669195f..89c115f 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -27,23 +27,27 @@
package org.forgerock.opendj.ldap;
-import static com.forgerock.opendj.util.StaticUtils.*;
-import static java.lang.System.*;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
+import static org.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT;
+import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_FAILED;
+import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
-import static org.forgerock.opendj.ldap.ErrorResultException.*;
-
-import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Level;
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
@@ -54,30 +58,297 @@
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
-import org.forgerock.opendj.ldap.responses.GenericExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
-import org.forgerock.opendj.ldif.ConnectionEntryReader;
+import com.forgerock.opendj.ldap.ConnectionState;
import com.forgerock.opendj.util.AsynchronousFutureResult;
+import com.forgerock.opendj.util.CompletedFutureResult;
import com.forgerock.opendj.util.FutureResultTransformer;
+import com.forgerock.opendj.util.RecursiveFutureResult;
import com.forgerock.opendj.util.ReferenceCountedObject;
+import com.forgerock.opendj.util.TimeSource;
import com.forgerock.opendj.util.Validator;
/**
* An heart beat connection factory can be used to create connections that sends
* a periodic search request to a Directory Server.
+ * <p>
+ * Before returning new connections to the application this factory will first
+ * send an initial heart beat in order to determine that the remote server is
+ * responsive. If the heart beat fails or times out then the connection is
+ * closed immediately and an error returned to the client.
+ * <p>
+ * Once a connection has been established successfully (including the initial
+ * heart beat), this factory will periodically send heart beats on the
+ * connection based on the configured heart beat interval. If the heart beat
+ * times out then the server is assumed to be down and an appropriate
+ * {@link ConnectionException} generated and published to any registered
+ * {@link ConnectionEventListener}s. Note however, that heart beats will only be
+ * sent when the connection is determined to be reasonably idle: there is no
+ * point in sending heart beats if the connection has recently received a
+ * response. A connection is deemed to be idle if no response has been received
+ * during a period equivalent to half the heart beat interval.
+ * <p>
+ * The LDAP protocol specifically precludes clients from performing operations
+ * while bind or startTLS requests are being performed. Likewise, a bind or
+ * startTLS request will cause active operations to be aborted. This factory
+ * coordinates heart beats with bind or startTLS requests, ensuring that they
+ * are not performed concurrently. Specifically, bind and startTLS requests are
+ * queued up while a heart beat is pending, and heart beats are not sent at all
+ * while there are pending bind or startTLS requests.
*/
final class HeartBeatConnectionFactory implements ConnectionFactory {
+
+ /**
+ * This class is responsible for performing an initial heart beat once a new
+ * connection has been obtained and, if the heart beat succeeds, adapting it
+ * to a {@code ConnectionImpl} and registering it in the table of valid
+ * connections.
+ */
+ private final class ConnectionFutureResultImpl {
+
+ /**
+ * This class handles the initial heart beat result notification or
+ * timeout. We need to take care to avoid processing multiple results,
+ * which may occur when the heart beat is timed out and a result follows
+ * soon after, or vice versa.
+ */
+ private final class InitialHeartBeatResultHandler implements SearchResultHandler, Runnable {
+ private final ResultHandler<? super Result> handler;
+
+ /**
+ * Due to a potential race between the heart beat timing out and the
+ * heart beat completing this atomic ensures that notification only
+ * occurs once.
+ */
+ private final AtomicBoolean isComplete = new AtomicBoolean();
+
+ private InitialHeartBeatResultHandler(final ResultHandler<? super Result> handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public boolean handleEntry(final SearchResultEntry entry) {
+ /*
+ * Depending on the configuration, a heartbeat may return some
+ * entries. However, we can just ignore them.
+ */
+ return true;
+ }
+
+ @Override
+ public void handleErrorResult(final ErrorResultException error) {
+ if (isComplete.compareAndSet(false, true)) {
+ handler.handleErrorResult(error);
+ }
+ }
+
+ @Override
+ public boolean handleReference(final SearchResultReference reference) {
+ /*
+ * Depending on the configuration, a heartbeat may return some
+ * references. However, we can just ignore them.
+ */
+ return true;
+ }
+
+ @Override
+ public void handleResult(final Result result) {
+ if (isComplete.compareAndSet(false, true)) {
+ handler.handleResult(result);
+ }
+ }
+
+ /*
+ * Invoked by the scheduler when the heart beat times out.
+ */
+ @Override
+ public void run() {
+ handleErrorResult(newHeartBeatTimeoutError());
+ }
+ }
+
+ private Connection connection;
+ private final RecursiveFutureResult<Connection, Result> futureConnectionResult;
+ private final FutureResultTransformer<Result, Connection> futureSearchResult;
+
+ private ConnectionFutureResultImpl(final ResultHandler<? super Connection> handler) {
+ // Create a future which will handle the initial heart beat result.
+ this.futureSearchResult = new FutureResultTransformer<Result, Connection>(handler) {
+
+ @Override
+ protected ErrorResultException transformErrorResult(
+ final ErrorResultException errorResult) {
+ // Ensure that the connection is closed.
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ return adaptHeartBeatError(errorResult);
+ }
+
+ @Override
+ protected Connection transformResult(final Result result)
+ throws ErrorResultException {
+ return adaptConnection(connection);
+ }
+
+ };
+
+ // Create a future which will handle connection result.
+ this.futureConnectionResult =
+ new RecursiveFutureResult<Connection, Result>(futureSearchResult) {
+
+ @Override
+ protected FutureResult<? extends Result> chainResult(
+ final Connection innerResult,
+ final ResultHandler<? super Result> handler)
+ throws ErrorResultException {
+ // Save the connection for later once the heart beat completes.
+ connection = innerResult;
+
+ /*
+ * Send the initial heart beat and schedule a client
+ * side timeout notification.
+ */
+ final InitialHeartBeatResultHandler wrappedHandler =
+ new InitialHeartBeatResultHandler(handler);
+ scheduler.get().schedule(wrappedHandler, timeoutMS,
+ TimeUnit.MILLISECONDS);
+ return connection.searchAsync(heartBeatRequest, null, wrappedHandler);
+ }
+ };
+
+ // Link the two futures.
+ futureSearchResult.setFutureResult(futureConnectionResult);
+ }
+
+ }
+
/**
* A connection that sends heart beats and supports all operations.
*/
- private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements
- ConnectionEventListener, SearchResultHandler {
+ private final class ConnectionImpl extends AbstractAsynchronousConnection implements
+ ConnectionEventListener {
+
+ /**
+ * A result handler wrapper for operations which timestamps the
+ * connection for each response received. The wrapper ensures that
+ * completed requests are removed from the {@code pendingResults} queue,
+ * as well as ensuring that requests are only completed once.
+ */
+ private abstract class AbstractWrappedResultHandler<R, H extends ResultHandler<? super R>>
+ implements ResultHandler<R>, FutureResult<R> {
+ /** The user provided result handler. */
+ protected final H handler;
+
+ private final CountDownLatch completed = new CountDownLatch(1);
+ private ErrorResultException error;
+ private FutureResult<R> innerFuture;
+ private R result;
+
+ AbstractWrappedResultHandler(final H handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ return innerFuture.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public R get() throws ErrorResultException, InterruptedException {
+ completed.await();
+ return get0();
+ }
+
+ @Override
+ public R get(final long timeout, final TimeUnit unit) throws ErrorResultException,
+ TimeoutException, InterruptedException {
+ if (completed.await(timeout, unit)) {
+ return get0();
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ @Override
+ public int getRequestID() {
+ return innerFuture.getRequestID();
+ }
+
+ @Override
+ public void handleErrorResult(final ErrorResultException error) {
+ if (tryComplete(null, error)) {
+ if (handler != null) {
+ handler.handleErrorResult(timestamp(error));
+ } else {
+ timestamp(error);
+ }
+ }
+ }
+
+ @Override
+ public void handleResult(final R result) {
+ if (tryComplete(result, null)) {
+ if (handler != null) {
+ handler.handleResult(timestamp(result));
+ } else {
+ timestamp(result);
+ }
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return innerFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return completed.getCount() == 0;
+ }
+
+ abstract void releaseBindOrStartTLSLockIfNeeded();
+
+ FutureResult<R> setInnerFuture(final FutureResult<R> innerFuture) {
+ this.innerFuture = innerFuture;
+ return this;
+ }
+
+ private R get0() throws ErrorResultException {
+ if (result != null) {
+ return result;
+ } else {
+ throw error;
+ }
+ }
+
+ /**
+ * Attempts to complete this request, returning true if successful.
+ * This method is synchronized in order to avoid race conditions
+ * with search result processing.
+ */
+ private synchronized boolean tryComplete(final R result,
+ final ErrorResultException error) {
+ if (pendingResults.remove(this)) {
+ this.result = result;
+ this.error = error;
+ completed.countDown();
+ releaseBindOrStartTLSLockIfNeeded();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ }
/**
* Runs pending request once the shared lock becomes available (when no
@@ -120,161 +391,246 @@
}
return null;
}
-
}
- /*
- * List of pending Bind or StartTLS requests which must be invoked when
+ /**
+ * A result handler wrapper for bind or startTLS requests which releases
+ * the bind/startTLS lock on completion.
+ */
+ private final class WrappedBindOrStartTLSResultHandler<R> extends
+ AbstractWrappedResultHandler<R, ResultHandler<? super R>> {
+ WrappedBindOrStartTLSResultHandler(final ResultHandler<? super R> handler) {
+ super(handler);
+ }
+
+ @Override
+ void releaseBindOrStartTLSLockIfNeeded() {
+ releaseBindOrStartTLSLock();
+ }
+ }
+
+ /**
+ * A result handler wrapper for normal requests which does not release
+ * the bind/startTLS lock on completion.
+ */
+ private final class WrappedResultHandler<R> extends
+ AbstractWrappedResultHandler<R, ResultHandler<? super R>> {
+ WrappedResultHandler(final ResultHandler<? super R> handler) {
+ super(handler);
+ }
+
+ @Override
+ void releaseBindOrStartTLSLockIfNeeded() {
+ // No-op for normal operations.
+ }
+ }
+
+ /**
+ * A result handler wrapper for search operations. Ensures that search
+ * results are not sent once the request has been completed (see
+ * markComplete()).
+ */
+ private final class WrappedSearchResultHandler extends
+ AbstractWrappedResultHandler<Result, SearchResultHandler> implements
+ SearchResultHandler {
+ WrappedSearchResultHandler(final SearchResultHandler handler) {
+ super(handler);
+ }
+
+ @Override
+ public synchronized boolean handleEntry(final SearchResultEntry entry) {
+ if (!isDone()) {
+ if (handler != null) {
+ handler.handleEntry(timestamp(entry));
+ } else {
+ timestamp(entry);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized boolean handleReference(final SearchResultReference reference) {
+ if (!isDone()) {
+ if (handler != null) {
+ handler.handleReference(timestamp(reference));
+ } else {
+ timestamp(reference);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ void releaseBindOrStartTLSLockIfNeeded() {
+ // No-op for normal operations.
+ }
+ }
+
+ /** The wrapped connection. */
+ private final Connection connection;
+
+ /**
+ * Search result handler for processing heart beat responses.
+ */
+ private final SearchResultHandler heartBeatHandler = new SearchResultHandler() {
+ @Override
+ public boolean handleEntry(final SearchResultEntry entry) {
+ timestamp(entry);
+ return true;
+ }
+
+ @Override
+ public void handleErrorResult(final ErrorResultException error) {
+ /*
+ * Connection failure will be handled by connection event
+ * listener. Ignore cancellation errors since these indicate
+ * that the heart beat was aborted by a client-side close.
+ */
+ if (!(error instanceof CancelledResultException)) {
+ if (DEBUG_LOG.isLoggable(Level.WARNING)) {
+ DEBUG_LOG.warning(String.format(
+ "Heartbeat failed for connection factory '%s': %s", factory, error
+ .getMessage()));
+ }
+ timestamp(error);
+ }
+ releaseHeartBeatLock();
+ }
+
+ @Override
+ public boolean handleReference(final SearchResultReference reference) {
+ timestamp(reference);
+ return true;
+ }
+
+ @Override
+ public void handleResult(final Result result) {
+ timestamp(result);
+ releaseHeartBeatLock();
+ }
+ };
+
+ /**
+ * List of pending Bind or StartTLS requests which must be invoked once
* the current heart beat completes.
*/
- private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>();
+ private final Queue<Runnable> pendingBindOrStartTLSRequests =
+ new ConcurrentLinkedQueue<Runnable>();
- /* Coordinates heart-beats with Bind and StartTLS requests. */
+ /**
+ * List of pending responses for all active operations. These will be
+ * signalled if no heart beat is detected within the permitted timeout
+ * period.
+ */
+ private final Queue<AbstractWrappedResultHandler<?, ?>> pendingResults =
+ new ConcurrentLinkedQueue<AbstractWrappedResultHandler<?, ?>>();
+
+ /** Internal connection state. */
+ private final ConnectionState state = new ConnectionState();
+
+ /** Coordinates heart-beats with Bind and StartTLS requests. */
private final Sync sync = new Sync();
- /*
+ /**
* Timestamp of last response received (any response, not just heart
* beats).
*/
- private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
+ private volatile long lastResponseTimestamp = timeSource.currentTimeMillis(); // Assume valid at creation.
private ConnectionImpl(final Connection connection) {
- super(connection);
+ this.connection = connection;
+ connection.addConnectionEventListener(this);
}
@Override
- public Result add(final AddRequest request) throws ErrorResultException {
- try {
- return timestamp(connection.add(request));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
- }
-
- @Override
- public Result add(final Entry entry) throws ErrorResultException {
- try {
- return timestamp(connection.add(entry));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
- }
-
- @Override
- public Result add(final String... ldifLines) throws ErrorResultException {
- try {
- return timestamp(connection.add(ldifLines));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
+ public FutureResult<Void> abandonAsync(final AbandonRequest request) {
+ return connection.abandonAsync(request);
}
@Override
public FutureResult<Result> addAsync(final AddRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
- return connection.addAsync(request, intermediateResponseHandler,
- timestamper(resultHandler));
- }
-
- @Override
- public BindResult bind(final BindRequest request) throws ErrorResultException {
- acquireBindOrStartTLSLock();
- try {
- return timestamp(connection.bind(request));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- } finally {
- releaseBindOrStartTLSLock();
+ if (checkState(resultHandler)) {
+ final WrappedResultHandler<Result> h = wrap(resultHandler);
+ return checkState(connection.addAsync(request, intermediateResponseHandler, h), h);
+ } else {
+ return newConnectionErrorFuture();
}
}
@Override
- public BindResult bind(final String name, final char[] password)
- throws ErrorResultException {
- acquireBindOrStartTLSLock();
- try {
- return timestamp(connection.bind(name, password));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- } finally {
- releaseBindOrStartTLSLock();
- }
+ public void addConnectionEventListener(final ConnectionEventListener listener) {
+ state.addConnectionEventListener(listener);
}
@Override
public FutureResult<BindResult> bindAsync(final BindRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super BindResult> resultHandler) {
- if (sync.tryLockShared()) {
- // Fast path
- return connection.bindAsync(request, intermediateResponseHandler, timestamper(
- resultHandler, true));
+ if (checkState(resultHandler)) {
+ if (sync.tryLockShared()) {
+ // Fast path
+ final WrappedBindOrStartTLSResultHandler<BindResult> h =
+ wrapForBindOrStartTLS(resultHandler);
+ return checkState(
+ connection.bindAsync(request, intermediateResponseHandler, h), h);
+ } else {
+ /*
+ * A heart beat must be in progress so create a runnable
+ * task which will be executed when the heart beat
+ * completes.
+ */
+ final DelayedFuture<BindResult> future =
+ new DelayedFuture<BindResult>(resultHandler) {
+ @Override
+ public FutureResult<BindResult> dispatch() {
+ final WrappedBindOrStartTLSResultHandler<BindResult> h =
+ wrapForBindOrStartTLS(this);
+ return checkState(connection.bindAsync(request,
+ intermediateResponseHandler, h), h);
+ }
+ };
+ /*
+ * Enqueue and flush if the heart beat has completed in the
+ * mean time.
+ */
+ pendingBindOrStartTLSRequests.offer(future);
+ flushPendingBindOrStartTLSRequests();
+ return future;
+ }
} else {
- /*
- * A heart beat must be in progress so create a runnable task
- * which will be executed when the heart beat completes.
- */
- final DelayedFuture<BindResult> future =
- new DelayedFuture<BindResult>(resultHandler) {
- @Override
- public FutureResult<BindResult> dispatch() {
- return connection.bindAsync(request, intermediateResponseHandler,
- timestamper(this, true));
- }
- };
- /*
- * Enqueue and flush if the heart beat has completed in the mean
- * time.
- */
- pendingRequests.offer(future);
- flushPendingRequests();
- return future;
+ return newConnectionErrorFuture();
}
}
@Override
- public CompareResult compare(final CompareRequest request) throws ErrorResultException {
- try {
- return timestamp(connection.compare(request));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
+ public void close() {
+ handleConnectionClosed();
+ connection.close();
}
@Override
- public CompareResult compare(final String name, final String attributeDescription,
- final String assertionValue) throws ErrorResultException {
- try {
- return timestamp(connection.compare(name, attributeDescription, assertionValue));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
+ public void close(final UnbindRequest request, final String reason) {
+ handleConnectionClosed();
+ connection.close(request, reason);
}
@Override
public FutureResult<CompareResult> compareAsync(final CompareRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super CompareResult> resultHandler) {
- return connection.compareAsync(request, intermediateResponseHandler,
- timestamper(resultHandler));
- }
-
- @Override
- public Result delete(final DeleteRequest request) throws ErrorResultException {
- try {
- return timestamp(connection.delete(request));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
- }
-
- @Override
- public Result delete(final String name) throws ErrorResultException {
- try {
- return timestamp(connection.delete(name));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
+ if (checkState(resultHandler)) {
+ final WrappedResultHandler<CompareResult> h = wrap(resultHandler);
+ return checkState(connection.compareAsync(request, intermediateResponseHandler, h),
+ h);
+ } else {
+ return newConnectionErrorFuture();
}
}
@@ -282,61 +638,12 @@
public FutureResult<Result> deleteAsync(final DeleteRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
- return connection.deleteAsync(request, intermediateResponseHandler,
- timestamper(resultHandler));
- }
-
- @Override
- public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request)
- throws ErrorResultException {
- final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
- if (isStartTLS) {
- acquireBindOrStartTLSLock();
- }
- try {
- return timestamp(connection.extendedRequest(request));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- } finally {
- if (isStartTLS) {
- releaseBindOrStartTLSLock();
- }
- }
- }
-
- @Override
- public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request,
- final IntermediateResponseHandler handler) throws ErrorResultException {
- final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
- if (isStartTLS) {
- acquireBindOrStartTLSLock();
- }
- try {
- return timestamp(connection.extendedRequest(request, handler));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- } finally {
- if (isStartTLS) {
- releaseBindOrStartTLSLock();
- }
- }
- }
-
- @Override
- public GenericExtendedResult extendedRequest(final String requestName,
- final ByteString requestValue) throws ErrorResultException {
- final boolean isStartTLS = requestName.equals(StartTLSExtendedRequest.OID);
- if (isStartTLS) {
- acquireBindOrStartTLSLock();
- }
- try {
- return timestamp(connection.extendedRequest(requestName, requestValue));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- } finally {
- if (isStartTLS) {
- releaseBindOrStartTLSLock();
- }
+ if (checkState(resultHandler)) {
+ final WrappedResultHandler<Result> h = wrap(resultHandler);
+ return checkState(connection.deleteAsync(request, intermediateResponseHandler, h),
+ h);
+ } else {
+ return newConnectionErrorFuture();
}
}
@@ -345,129 +652,101 @@
final ExtendedRequest<R> request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super R> resultHandler) {
- final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
- if (isStartTLS) {
- if (sync.tryLockShared()) {
- // Fast path
- return connection.extendedRequestAsync(request, intermediateResponseHandler,
- timestamper(resultHandler, true));
- } else {
- /*
- * A heart beat must be in progress so create a runnable
- * task which will be executed when the heart beat
- * completes.
- */
- final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
- @Override
- public FutureResult<R> dispatch() {
- return connection.extendedRequestAsync(request,
- intermediateResponseHandler, timestamper(this, true));
- }
- };
+ if (checkState(resultHandler)) {
+ if (isStartTLSRequest(request)) {
+ if (sync.tryLockShared()) {
+ // Fast path
+ final WrappedBindOrStartTLSResultHandler<R> h =
+ wrapForBindOrStartTLS(resultHandler);
+ return checkState(connection.extendedRequestAsync(request,
+ intermediateResponseHandler, h), h);
+ } else {
+ /*
+ * A heart beat must be in progress so create a runnable
+ * task which will be executed when the heart beat
+ * completes.
+ */
+ final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
+ @Override
+ public FutureResult<R> dispatch() {
+ final WrappedBindOrStartTLSResultHandler<R> h =
+ wrapForBindOrStartTLS(this);
+ return checkState(connection.extendedRequestAsync(request,
+ intermediateResponseHandler, h), h);
+ }
+ };
- /*
- * Enqueue and flush if the heart beat has completed in the
- * mean time.
- */
- pendingRequests.offer(future);
- flushPendingRequests();
- return future;
+ /*
+ * Enqueue and flush if the heart beat has completed in
+ * the mean time.
+ */
+ pendingBindOrStartTLSRequests.offer(future);
+ flushPendingBindOrStartTLSRequests();
+ return future;
+ }
+ } else {
+ final WrappedResultHandler<R> h = wrap(resultHandler);
+ return checkState(connection.extendedRequestAsync(request,
+ intermediateResponseHandler, h), h);
}
} else {
- return connection.extendedRequestAsync(request, intermediateResponseHandler,
- timestamper(resultHandler));
+ return newConnectionErrorFuture();
}
}
@Override
public void handleConnectionClosed() {
- notifyClosed();
+ if (state.notifyConnectionClosed()) {
+ failPendingResults(newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED,
+ HBCF_CONNECTION_CLOSED_BY_CLIENT.get()));
+ synchronized (validConnections) {
+ connection.removeConnectionEventListener(this);
+ validConnections.remove(this);
+ if (validConnections.isEmpty()) {
+ /*
+ * This is the last active connection, so stop the
+ * heartbeat.
+ */
+ heartBeatFuture.cancel(false);
+ }
+ }
+ }
}
@Override
public void handleConnectionError(final boolean isDisconnectNotification,
final ErrorResultException error) {
- notifyClosed();
- }
-
- @Override
- public boolean handleEntry(final SearchResultEntry entry) {
- updateTimestamp();
- return true;
- }
-
- @Override
- public void handleErrorResult(final ErrorResultException error) {
- if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage()));
+ if (state.notifyConnectionError(isDisconnectNotification, error)) {
+ failPendingResults(error);
}
- updateTimestamp();
- releaseHeartBeatLock();
- }
-
- @Override
- public boolean handleReference(final SearchResultReference reference) {
- updateTimestamp();
- return true;
- }
-
- @Override
- public void handleResult(final Result result) {
- updateTimestamp();
- releaseHeartBeatLock();
}
@Override
public void handleUnsolicitedNotification(final ExtendedResult notification) {
- updateTimestamp();
+ timestamp(notification);
+ state.notifyUnsolicitedNotification(notification);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return state.isClosed();
}
@Override
public boolean isValid() {
- return connection.isValid() && currentTimeMillis() < (timestamp + timeoutMS);
- }
-
- @Override
- public Result modify(final ModifyRequest request) throws ErrorResultException {
- try {
- return timestamp(connection.modify(request));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
- }
-
- @Override
- public Result modify(final String... ldifLines) throws ErrorResultException {
- try {
- return timestamp(connection.modify(ldifLines));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
+ return state.isValid() && connection.isValid();
}
@Override
public FutureResult<Result> modifyAsync(final ModifyRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
- return connection.modifyAsync(request, intermediateResponseHandler,
- timestamper(resultHandler));
- }
-
- @Override
- public Result modifyDN(final ModifyDNRequest request) throws ErrorResultException {
- try {
- return timestamp(connection.modifyDN(request));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
- }
-
- @Override
- public Result modifyDN(final String name, final String newRDN) throws ErrorResultException {
- try {
- return timestamp(connection.modifyDN(name, newRDN));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
+ if (checkState(resultHandler)) {
+ final WrappedResultHandler<Result> h = wrap(resultHandler);
+ return checkState(connection.modifyAsync(request, intermediateResponseHandler, h),
+ h);
+ } else {
+ return newConnectionErrorFuture();
}
}
@@ -475,121 +754,34 @@
public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
- return connection.modifyDNAsync(request, intermediateResponseHandler,
- timestamper(resultHandler));
- }
-
- @Override
- public SearchResultEntry readEntry(final DN name, final String... attributeDescriptions)
- throws ErrorResultException {
- try {
- return timestamp(connection.readEntry(name, attributeDescriptions));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
+ if (checkState(resultHandler)) {
+ final WrappedResultHandler<Result> h = wrap(resultHandler);
+ return checkState(
+ connection.modifyDNAsync(request, intermediateResponseHandler, h), h);
+ } else {
+ return newConnectionErrorFuture();
}
}
@Override
- public SearchResultEntry readEntry(final String name, final String... attributeDescriptions)
- throws ErrorResultException {
- try {
- return timestamp(connection.readEntry(name, attributeDescriptions));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
- }
-
- @Override
- public FutureResult<SearchResultEntry> readEntryAsync(final DN name,
- final Collection<String> attributeDescriptions,
- final ResultHandler<? super SearchResultEntry> handler) {
- return connection.readEntryAsync(name, attributeDescriptions, timestamper(handler));
- }
-
- @Override
- public ConnectionEntryReader search(final SearchRequest request) {
- // Ensure that search results update timestamp.
- return new ConnectionEntryReader(this, request);
- }
-
- @Override
- public Result search(final SearchRequest request,
- final Collection<? super SearchResultEntry> entries) throws ErrorResultException {
- try {
- return timestamp(connection.search(request, entries));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
- }
-
- @Override
- public Result search(final SearchRequest request,
- final Collection<? super SearchResultEntry> entries,
- final Collection<? super SearchResultReference> references)
- throws ErrorResultException {
- try {
- return timestamp(connection.search(request, entries, references));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
- }
-
- @Override
- public Result search(final SearchRequest request, final SearchResultHandler handler)
- throws ErrorResultException {
- try {
- return connection.search(request, timestamper(handler));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
- }
-
- @Override
- public ConnectionEntryReader search(final String baseObject, final SearchScope scope,
- final String filter, final String... attributeDescriptions) {
- // Ensure that search results update timestamp.
- final SearchRequest request =
- Requests.newSearchRequest(baseObject, scope, filter, attributeDescriptions);
- return new ConnectionEntryReader(this, request);
+ public void removeConnectionEventListener(final ConnectionEventListener listener) {
+ state.removeConnectionEventListener(listener);
}
@Override
public FutureResult<Result> searchAsync(final SearchRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final SearchResultHandler resultHandler) {
- return connection.searchAsync(request, intermediateResponseHandler,
- timestamper(resultHandler));
- }
-
- @Override
- public SearchResultEntry searchSingleEntry(final SearchRequest request)
- throws ErrorResultException {
- try {
- return timestamp(connection.searchSingleEntry(request));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
+ if (checkState(resultHandler)) {
+ final WrappedSearchResultHandler h = wrap(resultHandler);
+ return checkState(connection.searchAsync(request, intermediateResponseHandler, h),
+ h);
+ } else {
+ return newConnectionErrorFuture();
}
}
@Override
- public SearchResultEntry searchSingleEntry(final String baseObject,
- final SearchScope scope, final String filter, final String... attributeDescriptions)
- throws ErrorResultException {
- try {
- return timestamp(connection.searchSingleEntry(baseObject, scope, filter,
- attributeDescriptions));
- } catch (final ErrorResultException e) {
- throw timestamp(e);
- }
- }
-
- @Override
- public FutureResult<SearchResultEntry> searchSingleEntryAsync(final SearchRequest request,
- final ResultHandler<? super SearchResultEntry> handler) {
- return connection.searchSingleEntryAsync(request, timestamper(handler));
- }
-
- @Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("HeartBeatConnection(");
@@ -598,24 +790,57 @@
return builder.toString();
}
- private void acquireBindOrStartTLSLock() throws ErrorResultException {
- /*
- * Wait for pending heartbeats and prevent new heartbeats from being
- * sent while the bind is in progress.
- */
- try {
- if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
- // Give up - it looks like the connection is dead.
- // FIXME: improve error message.
- throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN);
+ private void checkForHeartBeat() {
+ if (sync.isHeldExclusively()) {
+ /*
+ * A heart beat is still in progress, but it should have
+ * completed by now. Let's avoid aggressively terminating the
+ * connection, because the heart beat may simply have been
+ * delayed by a sudden surge of activity. Therefore, only flag
+ * the connection as failed if no activity has been seen on the
+ * connection since the heart beat was sent.
+ */
+ final long currentTimeMillis = timeSource.currentTimeMillis();
+ if (lastResponseTimestamp < (currentTimeMillis - timeoutMS)) {
+ if (DEBUG_LOG.isLoggable(Level.WARNING)) {
+ DEBUG_LOG.warning(String.format(
+ "No heartbeat detected for connection '%s'", connection));
+ }
+ handleConnectionError(false, newHeartBeatTimeoutError());
}
- } catch (final InterruptedException e) {
- throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
}
}
- private void flushPendingRequests() {
- if (!pendingRequests.isEmpty()) {
+ private <R> FutureResult<R> checkState(final FutureResult<R> future,
+ final AbstractWrappedResultHandler<R, ? extends ResultHandler<? super R>> h) {
+ h.setInnerFuture(future);
+ checkState(h);
+ return h;
+ }
+
+ private boolean checkState(final ResultHandler<?> h) {
+ final ErrorResultException error = state.getConnectionError();
+ if (error != null) {
+ h.handleErrorResult(error);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ private void failPendingResults(final ErrorResultException error) {
+ /*
+ * Peek instead of pool because notification is responsible for
+ * removing the element from the queue.
+ */
+ AbstractWrappedResultHandler<?, ?> pendingResult;
+ while ((pendingResult = pendingResults.peek()) != null) {
+ pendingResult.handleErrorResult(error);
+ }
+ }
+
+ private void flushPendingBindOrStartTLSRequests() {
+ if (!pendingBindOrStartTLSRequests.isEmpty()) {
/*
* The pending requests will acquire the shared lock, but we
* take it here anyway to ensure that pending requests do not
@@ -624,7 +849,8 @@
if (sync.tryLockShared()) {
try {
Runnable pendingRequest;
- while ((pendingRequest = pendingRequests.poll()) != null) {
+ while ((pendingRequest = pendingBindOrStartTLSRequests.poll()) != null) {
+ // Dispatch the waiting request. This will not block.
pendingRequest.run();
}
} finally {
@@ -634,18 +860,12 @@
}
}
- private void notifyClosed() {
- synchronized (activeConnections) {
- connection.removeConnectionEventListener(this);
- activeConnections.remove(this);
- if (activeConnections.isEmpty()) {
- /*
- * This is the last active connection, so stop the
- * heartbeat.
- */
- heartBeatFuture.cancel(false);
- }
- }
+ private boolean isStartTLSRequest(final ExtendedRequest<?> request) {
+ return request.getOID().equals(StartTLSExtendedRequest.OID);
+ }
+
+ private <R> CompletedFutureResult<R> newConnectionErrorFuture() {
+ return new CompletedFutureResult<R>(state.getConnectionError());
}
private void releaseBindOrStartTLSLock() {
@@ -654,27 +874,43 @@
private void releaseHeartBeatLock() {
sync.unlockExclusively();
- flushPendingRequests();
+ flushPendingBindOrStartTLSRequests();
}
- private void sendHeartBeat() {
+ /**
+ * Sends a heart beat on this connection if required to do so.
+ *
+ * @return {@code true} if a heart beat was sent, otherwise
+ * {@code false}.
+ */
+ private boolean sendHeartBeat() {
/*
- * Only send the heartbeat if the connection has been idle for some
+ * Don't attempt to send a heart beat if the connection has already
+ * failed.
+ */
+ if (!state.isValid()) {
+ return false;
+ }
+
+ /*
+ * Only send the heart beat if the connection has been idle for some
* time.
*/
- if (currentTimeMillis() < (timestamp + minDelayMS)) {
- return;
+ final long currentTimeMillis = timeSource.currentTimeMillis();
+ if (currentTimeMillis < (lastResponseTimestamp + minDelayMS)) {
+ return false;
}
/*
* Don't send a heart beat if there is already a heart beat, bind,
* or startTLS in progress. Note that the bind/startTLS response
- * will update the timestamp as if it were a heart beat.
+ * will update the lastResponseTimestamp as if it were a heart beat.
*/
if (sync.tryLockExclusively()) {
try {
- connection.searchAsync(heartBeatRequest, null, this);
- } catch (final Exception e) {
+ connection.searchAsync(heartBeatRequest, null, heartBeatHandler);
+ return true;
+ } catch (final IllegalStateException e) {
/*
* This may happen when we attempt to send the heart beat
* just after the connection is closed but before we are
@@ -688,74 +924,34 @@
releaseHeartBeatLock();
}
}
+ return false;
}
private <R> R timestamp(final R response) {
- updateTimestamp();
+ if (!(response instanceof ConnectionException)) {
+ lastResponseTimestamp = timeSource.currentTimeMillis();
+ }
return response;
}
- private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler) {
- return timestamper(handler, false);
+ private <R> WrappedResultHandler<R> wrap(final ResultHandler<? super R> handler) {
+ final WrappedResultHandler<R> h = new WrappedResultHandler<R>(handler);
+ pendingResults.add(h);
+ return h;
}
- private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler,
- final boolean isBindOrStartTLS) {
- return new ResultHandler<R>() {
- @Override
- public void handleErrorResult(final ErrorResultException error) {
- releaseIfNeeded();
- if (handler != null) {
- handler.handleErrorResult(timestamp(error));
- } else {
- timestamp(error);
- }
- }
-
- @Override
- public void handleResult(final R result) {
- releaseIfNeeded();
- if (handler != null) {
- handler.handleResult(timestamp(result));
- } else {
- timestamp(result);
- }
- }
-
- private void releaseIfNeeded() {
- if (isBindOrStartTLS) {
- releaseBindOrStartTLSLock();
- }
- }
- };
+ private WrappedSearchResultHandler wrap(final SearchResultHandler handler) {
+ final WrappedSearchResultHandler h = new WrappedSearchResultHandler(handler);
+ pendingResults.add(h);
+ return h;
}
- private SearchResultHandler timestamper(final SearchResultHandler handler) {
- return new SearchResultHandler() {
- @Override
- public boolean handleEntry(final SearchResultEntry entry) {
- return handler.handleEntry(timestamp(entry));
- }
-
- @Override
- public void handleErrorResult(final ErrorResultException error) {
- handler.handleErrorResult(timestamp(error));
- }
-
- @Override
- public boolean handleReference(final SearchResultReference reference) {
- return handler.handleReference(timestamp(reference));
- }
-
- @Override
- public void handleResult(final Result result) {
- handler.handleResult(timestamp(result));
- }
- };
- }
-
- private void updateTimestamp() {
- timestamp = currentTimeMillis();
+ private <R> WrappedBindOrStartTLSResultHandler<R> wrapForBindOrStartTLS(
+ final ResultHandler<? super R> handler) {
+ final WrappedBindOrStartTLSResultHandler<R> h =
+ new WrappedBindOrStartTLSResultHandler<R>(handler);
+ pendingResults.add(h);
+ return h;
}
}
@@ -791,13 +987,13 @@
* </ul>
*/
private static final class Sync extends AbstractQueuedSynchronizer {
- /* Lock states. Positive values indicate that the shared lock is taken. */
- private static final int UNLOCKED = 0; // initial state
private static final int LOCKED_EXCLUSIVELY = -1;
-
// Keep compiler quiet.
private static final long serialVersionUID = -3590428415442668336L;
+ /* Lock states. Positive values indicate that the shared lock is taken. */
+ private static final int UNLOCKED = 0; // initial state
+
@Override
protected boolean isHeldExclusively() {
return getState() == LOCKED_EXCLUSIVELY;
@@ -866,10 +1062,6 @@
return tryAcquireShared(1) > 0;
}
- boolean tryLockShared(final long timeout, final TimeUnit unit) throws InterruptedException {
- return tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
-
void unlockExclusively() {
release(0 /* unused */);
}
@@ -880,60 +1072,122 @@
}
+ /**
+ * Default heart beat which will target the root DSE but not return any
+ * results.
+ */
private static final SearchRequest DEFAULT_SEARCH = Requests.newSearchRequest("",
- SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1");
+ SearchScope.BASE_OBJECT, "(objectClass=1.1)", "1.1");
- private final List<ConnectionImpl> activeConnections;
+ /**
+ * This is package private in order to allow unit tests to inject fake time
+ * stamps.
+ */
+ TimeSource timeSource = TimeSource.DEFAULT;
+
+ /**
+ * Scheduled task which checks that all heart beats have been received
+ * within the timeout period.
+ */
+ private final Runnable checkHeartBeatRunnable = new Runnable() {
+ @Override
+ public void run() {
+ for (final ConnectionImpl connection : getValidConnections()) {
+ connection.checkForHeartBeat();
+ }
+ }
+ };
+
+ /**
+ * Underlying connection factory.
+ */
private final ConnectionFactory factory;
+
+ /**
+ * The heartbeat scheduled future - which may be null if heartbeats are not
+ * being sent (no valid connections).
+ */
private ScheduledFuture<?> heartBeatFuture;
+
+ /**
+ * The heartbeat search request.
+ */
private final SearchRequest heartBeatRequest;
+
+ /**
+ * The interval between successive heartbeats.
+ */
private final long interval;
+ private final TimeUnit intervalUnit;
+
+ /**
+ * Flag which indicates whether this factory has been closed.
+ */
+ private final AtomicBoolean isClosed = new AtomicBoolean();
+
+ /**
+ * The minimum amount of time the connection should remain idle (no
+ * responses) before starting to send heartbeats.
+ */
private final long minDelayMS;
+ /**
+ * The heartbeat scheduler.
+ */
private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
+
+ /**
+ * Scheduled task which sends heart beats for all valid idle connections.
+ */
+ private final Runnable sendHeartBeatRunnable = new Runnable() {
+ @Override
+ public void run() {
+ boolean heartBeatSent = false;
+ for (final ConnectionImpl connection : getValidConnections()) {
+ heartBeatSent |= connection.sendHeartBeat();
+ }
+ if (heartBeatSent) {
+ scheduler.get().schedule(checkHeartBeatRunnable, timeoutMS, TimeUnit.MILLISECONDS);
+ }
+ }
+ };
+
+ /**
+ * The heartbeat timeout in milli-seconds. The connection will be marked as
+ * failed if no heartbeat response is received within the timeout.
+ */
private final long timeoutMS;
- private final TimeUnit unit;
- private AtomicBoolean isClosed = new AtomicBoolean();
- HeartBeatConnectionFactory(final ConnectionFactory factory) {
- this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null);
- }
+ /**
+ * List of valid connections to which heartbeats will be sent.
+ */
+ private final List<ConnectionImpl> validConnections = new LinkedList<ConnectionImpl>();
HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
- final TimeUnit unit) {
- this(factory, interval, unit, DEFAULT_SEARCH, null);
- }
-
- HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
- final TimeUnit unit, final SearchRequest heartBeat) {
- this(factory, interval, unit, heartBeat, null);
- }
-
- HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
- final TimeUnit unit, final SearchRequest heartBeat,
+ final long timeout, final TimeUnit unit, final SearchRequest heartBeat,
final ScheduledExecutorService scheduler) {
- Validator.ensureNotNull(factory, heartBeat, unit);
- Validator.ensureTrue(interval >= 0, "negative timeout");
+ Validator.ensureNotNull(factory, unit);
+ Validator.ensureTrue(interval >= 0, "negative interval");
+ Validator.ensureTrue(timeout >= 0, "negative timeout");
- this.heartBeatRequest = heartBeat;
+ this.heartBeatRequest = heartBeat != null ? heartBeat : DEFAULT_SEARCH;
this.interval = interval;
- this.unit = unit;
- this.activeConnections = new LinkedList<ConnectionImpl>();
+ this.intervalUnit = unit;
this.factory = factory;
this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
- this.timeoutMS = unit.toMillis(interval) * 2;
+ this.timeoutMS = unit.toMillis(timeout);
this.minDelayMS = unit.toMillis(interval) / 2;
}
@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
- synchronized (activeConnections) {
- if (!activeConnections.isEmpty()) {
+ synchronized (validConnections) {
+ if (!validConnections.isEmpty()) {
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
"HeartbeatConnectionFactory '%s' is closing while %d "
- + "active connections remain", toString(),
- activeConnections.size()));
+ + "active connections remain", toString(), validConnections
+ .size()));
}
}
}
@@ -944,23 +1198,41 @@
@Override
public Connection getConnection() throws ErrorResultException {
- return adaptConnection(factory.getConnection());
+ /*
+ * Immediately send a heart beat in order to determine if the connected
+ * server is responding.
+ */
+ final Connection connection = factory.getConnection();
+ boolean keepConnection = false;
+ try {
+ connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
+ TimeUnit.MILLISECONDS);
+ keepConnection = true;
+ return adaptConnection(connection);
+ } catch (final Exception e) {
+ throw adaptHeartBeatError(e);
+ } finally {
+ if (!keepConnection) {
+ connection.close();
+ }
+ }
}
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
- final FutureResultTransformer<Connection, Connection> future =
- new FutureResultTransformer<Connection, Connection>(handler) {
- @Override
- protected Connection transformResult(final Connection connection)
- throws ErrorResultException {
- return adaptConnection(connection);
- }
- };
+ // Create a future responsible for chaining the initial heartbeat search.
+ final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler);
- future.setFutureResult(factory.getConnectionAsync(future));
- return future;
+ // Request a connection.
+ final FutureResult<Connection> connectionFuture =
+ factory.getConnectionAsync(compositeFuture.futureConnectionResult);
+
+ // Set the connection future in the composite so that the returned search future can delegate.
+ compositeFuture.futureConnectionResult.setFutureResult(connectionFuture);
+
+ // Return the future representing the heartbeat.
+ return compositeFuture.futureSearchResult;
}
@Override
@@ -973,26 +1245,42 @@
}
private Connection adaptConnection(final Connection connection) {
- final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection);
- synchronized (activeConnections) {
- connection.addConnectionEventListener(heartBeatConnection);
- if (activeConnections.isEmpty()) {
+ synchronized (validConnections) {
+ final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection);
+ if (validConnections.isEmpty()) {
/* This is the first active connection, so start the heart beat. */
- heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- final ConnectionImpl[] tmp;
- synchronized (activeConnections) {
- tmp = activeConnections.toArray(new ConnectionImpl[0]);
- }
- for (final ConnectionImpl connection : tmp) {
- connection.sendHeartBeat();
- }
- }
- }, 0, interval, unit);
+ heartBeatFuture =
+ scheduler.get().scheduleWithFixedDelay(sendHeartBeatRunnable, 0, interval,
+ intervalUnit);
}
- activeConnections.add(heartBeatConnection);
+ validConnections.add(heartBeatConnection);
+ return heartBeatConnection;
}
- return heartBeatConnection;
+ }
+
+ private ErrorResultException adaptHeartBeatError(final Exception error) {
+ if (error instanceof ConnectionException) {
+ return (ErrorResultException) error;
+ } else if (error instanceof TimeoutResultException || error instanceof TimeoutException) {
+ return newHeartBeatTimeoutError();
+ } else if (error instanceof InterruptedException) {
+ return newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, error);
+ } else {
+ return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_FAILED.get(),
+ error);
+ }
+ }
+
+ private ConnectionImpl[] getValidConnections() {
+ final ConnectionImpl[] tmp;
+ synchronized (validConnections) {
+ tmp = validConnections.toArray(new ConnectionImpl[0]);
+ }
+ return tmp;
+ }
+
+ private ErrorResultException newHeartBeatTimeoutError() {
+ return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_TIMEOUT
+ .get(timeoutMS));
}
}
diff --git a/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties b/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
index abe82c2..1057f00 100755
--- a/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
+++ b/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
@@ -1418,3 +1418,7 @@
ERR_ENTRY_INCREMENT_CANNOT_PARSE_AS_INT=Unable to increment the value of \
attribute '%s' because either the current value or the increment could \
not be parsed as an integer
+HBCF_CONNECTION_CLOSED_BY_CLIENT=Connection closed by client
+HBCF_HEARTBEAT_FAILED=Heartbeat failed
+HBCF_HEARTBEAT_TIMEOUT=Heartbeat timed out after %d ms
+
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/ConnectionStateTest.java b/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/ConnectionStateTest.java
new file mode 100644
index 0000000..18ef073
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/ConnectionStateTest.java
@@ -0,0 +1,250 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS.
+ */
+package com.forgerock.opendj.ldap;
+
+import static org.fest.assertions.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.forgerock.opendj.ldap.ConnectionEventListener;
+import org.forgerock.opendj.ldap.ErrorResultException;
+import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for {@linkConnectionState}.
+ */
+@SuppressWarnings("javadoc")
+public class ConnectionStateTest extends LDAPTestCase {
+ private static final ErrorResultException ERROR = newErrorResult(ResultCode.OTHER);
+ private static final ErrorResultException LATE_ERROR = newErrorResult(ResultCode.BUSY);
+ private static final ExtendedResult UNSOLICITED =
+ newGenericExtendedResult(ResultCode.OPERATIONS_ERROR);
+
+ @Test
+ public void testCloseEventInClosedState() {
+ final ConnectionState state = new ConnectionState();
+ final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener1);
+ state.notifyConnectionClosed();
+ state.notifyConnectionClosed();
+
+ assertThat(state.isValid()).isFalse();
+ assertThat(state.isClosed()).isTrue();
+ assertThat(state.getConnectionError()).isNull();
+ verify(listener1).handleConnectionClosed();
+ verifyNoMoreInteractions(listener1);
+
+ // Listeners registered after event should be notified immediately.
+ final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener2);
+ verify(listener2).handleConnectionClosed();
+ verifyNoMoreInteractions(listener2);
+ }
+
+ @Test
+ public void testCloseEventInErrorState() {
+ final ConnectionState state = new ConnectionState();
+ final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener1);
+ state.notifyConnectionError(false, ERROR);
+ state.notifyConnectionClosed();
+
+ assertThat(state.isValid()).isFalse();
+ assertThat(state.isClosed()).isTrue();
+ assertThat(state.getConnectionError()).isSameAs(ERROR);
+ verify(listener1).handleConnectionError(false, ERROR);
+ verify(listener1).handleConnectionClosed();
+ verifyNoMoreInteractions(listener1);
+
+ // Listeners registered after event should be notified immediately.
+ final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener2);
+ verify(listener2).handleConnectionError(false, ERROR);
+ verify(listener2).handleConnectionClosed();
+ verifyNoMoreInteractions(listener2);
+ }
+
+ @Test
+ public void testCloseEventInValidState() {
+ final ConnectionState state = new ConnectionState();
+ final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener1);
+ state.notifyConnectionClosed();
+
+ assertThat(state.isValid()).isFalse();
+ assertThat(state.isClosed()).isTrue();
+ assertThat(state.getConnectionError()).isNull();
+ verify(listener1).handleConnectionClosed();
+ verifyNoMoreInteractions(listener1);
+
+ // Listeners registered after event should be notified immediately.
+ final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener2);
+ verify(listener2).handleConnectionClosed();
+ verifyNoMoreInteractions(listener2);
+ }
+
+ @Test
+ public void testErrorEventInClosedState() {
+ final ConnectionState state = new ConnectionState();
+ final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener1);
+ state.notifyConnectionClosed();
+ state.notifyConnectionError(false, ERROR);
+
+ assertThat(state.isValid()).isFalse();
+ assertThat(state.isClosed()).isTrue();
+ assertThat(state.getConnectionError()).isNull();
+ verify(listener1).handleConnectionClosed();
+ verifyNoMoreInteractions(listener1);
+
+ // Listeners registered after event should be notified immediately.
+ final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener2);
+ verify(listener2).handleConnectionClosed();
+ verifyNoMoreInteractions(listener2);
+ }
+
+ @Test
+ public void testErrorEventInErrorState() {
+ final ConnectionState state = new ConnectionState();
+ final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener1);
+ state.notifyConnectionError(false, ERROR);
+ state.notifyConnectionError(false, LATE_ERROR);
+
+ assertThat(state.isValid()).isFalse();
+ assertThat(state.isClosed()).isFalse();
+ assertThat(state.getConnectionError()).isSameAs(ERROR);
+ verify(listener1).handleConnectionError(false, ERROR);
+ verifyNoMoreInteractions(listener1);
+
+ // Listeners registered after event should be notified immediately.
+ final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener2);
+ assertThat(state.getConnectionError()).isSameAs(ERROR);
+ verify(listener2).handleConnectionError(false, ERROR);
+ verifyNoMoreInteractions(listener2);
+ }
+
+ @Test
+ public void testErrorEventInValidState() {
+ final ConnectionState state = new ConnectionState();
+ final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener1);
+ state.notifyConnectionError(false, ERROR);
+
+ assertThat(state.isValid()).isFalse();
+ assertThat(state.isClosed()).isFalse();
+ assertThat(state.getConnectionError()).isSameAs(ERROR);
+ verify(listener1).handleConnectionError(false, ERROR);
+ verifyNoMoreInteractions(listener1);
+
+ // Listeners registered after event should be notified immediately.
+ final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener2);
+ verify(listener2).handleConnectionError(false, ERROR);
+ verifyNoMoreInteractions(listener2);
+ }
+
+ @Test
+ public void testRemoveEventListener() {
+ final ConnectionState state = new ConnectionState();
+ final ConnectionEventListener listener = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener);
+ state.notifyConnectionError(false, ERROR);
+ verify(listener).handleConnectionError(false, ERROR);
+ state.removeConnectionEventListener(listener);
+ state.notifyConnectionClosed();
+ verifyNoMoreInteractions(listener);
+ }
+
+ @Test
+ public void testUnsolicitedNotificationEventInClosedState() {
+ final ConnectionState state = new ConnectionState();
+ final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener1);
+ state.notifyConnectionClosed();
+ state.notifyUnsolicitedNotification(UNSOLICITED);
+
+ assertThat(state.isValid()).isFalse();
+ assertThat(state.isClosed()).isTrue();
+ assertThat(state.getConnectionError()).isNull();
+ verify(listener1).handleConnectionClosed();
+ verifyNoMoreInteractions(listener1);
+ }
+
+ @Test
+ public void testUnsolicitedNotificationEventInErrorState() {
+ final ConnectionState state = new ConnectionState();
+ final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener1);
+ state.notifyConnectionError(false, ERROR);
+ state.notifyUnsolicitedNotification(UNSOLICITED);
+
+ assertThat(state.isValid()).isFalse();
+ assertThat(state.isClosed()).isFalse();
+ assertThat(state.getConnectionError()).isSameAs(ERROR);
+ verify(listener1).handleConnectionError(false, ERROR);
+ verifyNoMoreInteractions(listener1);
+ }
+
+ @Test
+ public void testUnsolicitedNotificationEventInValidState() {
+ final ConnectionState state = new ConnectionState();
+ final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener1);
+ state.notifyUnsolicitedNotification(UNSOLICITED);
+
+ assertThat(state.isValid()).isTrue();
+ assertThat(state.isClosed()).isFalse();
+ assertThat(state.getConnectionError()).isNull();
+ verify(listener1).handleUnsolicitedNotification(same(UNSOLICITED));
+ verifyNoMoreInteractions(listener1);
+
+ /*
+ * Listeners registered after event will not be notified (unsolicited
+ * notifications are not cached).
+ */
+ final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+ state.addConnectionEventListener(listener2);
+ verifyNoMoreInteractions(listener2);
+ }
+
+ @Test
+ public void testValidState() {
+ final ConnectionState state = new ConnectionState();
+ assertThat(state.isValid()).isTrue();
+ assertThat(state.isClosed()).isFalse();
+ assertThat(state.getConnectionError()).isNull();
+ }
+}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/FutureResultTransformerTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/FutureResultTransformerTestCase.java
new file mode 100644
index 0000000..5372fd7
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/FutureResultTransformerTestCase.java
@@ -0,0 +1,132 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2013 ForgeRock AS.
+ */
+
+package com.forgerock.opendj.util;
+
+import static org.fest.assertions.Assertions.assertThat;
+import static org.fest.assertions.Fail.fail;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.TimeUnit;
+
+import org.forgerock.opendj.ldap.ErrorResultException;
+import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.ResultHandler;
+import org.testng.annotations.Test;
+
+/**
+ * Tests {@link FutureResultTransformer}.
+ */
+@SuppressWarnings("javadoc")
+public class FutureResultTransformerTestCase extends UtilTestCase {
+ private static final class TestFuture extends FutureResultTransformer<Integer, String> {
+ public TestFuture(final ResultHandler<? super String> handler) {
+ super(handler);
+ }
+
+ @Override
+ protected ErrorResultException transformErrorResult(final ErrorResultException error) {
+ assertThat(error).isSameAs(UNTRANSFORMED_ERROR);
+ return TRANSFORMED_ERROR;
+ }
+
+ @Override
+ protected String transformResult(final Integer result) throws ErrorResultException {
+ assertThat(result).isSameAs(UNTRANSFORMED_RESULT);
+ return TRANSFORMED_RESULT;
+ }
+ }
+
+ private static final ErrorResultException TRANSFORMED_ERROR = newErrorResult(ResultCode.OTHER,
+ "transformed");
+ private static final String TRANSFORMED_RESULT = "transformed";
+ private static final ErrorResultException UNTRANSFORMED_ERROR = newErrorResult(
+ ResultCode.OTHER, "untransformed");
+ private static final Integer UNTRANSFORMED_RESULT = Integer.valueOf(0);
+
+ @Test
+ public void testGetTransformsError() throws Exception {
+ final TestFuture future = new TestFuture(null);
+ future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_ERROR));
+ future.handleErrorResult(UNTRANSFORMED_ERROR);
+ try {
+ future.get();
+ fail();
+ } catch (final ErrorResultException e) {
+ assertThat(e).isSameAs(TRANSFORMED_ERROR);
+ }
+ }
+
+ @Test
+ public void testGetTransformsResult() throws Exception {
+ final TestFuture future = new TestFuture(null);
+ future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_RESULT));
+ future.handleResult(UNTRANSFORMED_RESULT);
+ assertThat(future.get()).isSameAs(TRANSFORMED_RESULT);
+ }
+
+ @Test
+ public void testGetWithTimeoutTransformsError() throws Exception {
+ final TestFuture future = new TestFuture(null);
+ future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_ERROR));
+ future.handleErrorResult(UNTRANSFORMED_ERROR);
+ try {
+ future.get(100, TimeUnit.SECONDS);
+ fail();
+ } catch (final ErrorResultException e) {
+ assertThat(e).isSameAs(TRANSFORMED_ERROR);
+ }
+ }
+
+ @Test
+ public void testGetWithTimeoutTransformsResult() throws Exception {
+ final TestFuture future = new TestFuture(null);
+ future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_RESULT));
+ future.handleResult(UNTRANSFORMED_RESULT);
+ assertThat(future.get(100, TimeUnit.SECONDS)).isSameAs(TRANSFORMED_RESULT);
+ }
+
+ @Test
+ public void testResultHandlerTransformsError() throws Exception {
+ @SuppressWarnings("unchecked")
+ final ResultHandler<String> handler = mock(ResultHandler.class);
+ final TestFuture future = new TestFuture(handler);
+ future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_ERROR));
+ future.handleErrorResult(UNTRANSFORMED_ERROR);
+ verify(handler).handleErrorResult(TRANSFORMED_ERROR);
+ }
+
+ @Test
+ public void testResultHandlerTransformsResult() throws Exception {
+ @SuppressWarnings("unchecked")
+ final ResultHandler<String> handler = mock(ResultHandler.class);
+ final TestFuture future = new TestFuture(handler);
+ future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_RESULT));
+ future.handleResult(UNTRANSFORMED_RESULT);
+ verify(handler).handleResult(TRANSFORMED_RESULT);
+ }
+}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
index b8f4815..49d573d 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
@@ -181,7 +181,7 @@
assertThat(scheduler.isScheduled()).isTrue();
// Forcefully run the monitor task and check that the first factory is online.
- scheduler.getCommand().run();
+ scheduler.runFirstTask();
verify(listener).handleConnectionFactoryOnline(firstAsync);
verifyNoMoreInteractions(listener);
} finally {
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
index 4361cc5..8ceb48f 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
@@ -138,8 +138,8 @@
"objectclass=*", "cn");
factories[0][0] =
- new HeartBeatConnectionFactory(new LDAPConnectionFactory(getServerSocketAddress()),
- 1000, TimeUnit.MILLISECONDS, request);
+ Connections.newHeartBeatConnectionFactory(new LDAPConnectionFactory(
+ getServerSocketAddress()), 1000, 500, TimeUnit.MILLISECONDS, request);
// InternalConnectionFactory
factories[1][0] = Connections.newInternalConnectionFactory(LDAPServer.getInstance(), null);
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
index 83348e3..0dd7596 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -44,7 +44,6 @@
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.forgerock.opendj.ldap.requests.BindRequest;
@@ -448,10 +447,7 @@
assertThat(scheduler.isScheduled()).isTrue();
// First populate the pool with idle connections at time 0.
- @SuppressWarnings("unchecked")
- final Callable<Long> timeSource = mock(Callable.class);
- when(timeSource.call()).thenReturn(0L);
- pool.testTimeSource = timeSource;
+ pool.timeSource = mockTimeSource(0);
assertThat(pool.currentPoolSize()).isEqualTo(0);
Connection c1 = pool.getConnection();
@@ -466,13 +462,13 @@
assertThat(pool.currentPoolSize()).isEqualTo(4);
// First purge at time 50 is no-op because no connections have expired.
- when(timeSource.call()).thenReturn(50L);
- scheduler.getCommand().run();
+ when(pool.timeSource.currentTimeMillis()).thenReturn(50L);
+ scheduler.runFirstTask();
assertThat(pool.currentPoolSize()).isEqualTo(4);
// Second purge at time 150 should remove 2 non-core connections.
- when(timeSource.call()).thenReturn(150L);
- scheduler.getCommand().run();
+ when(pool.timeSource.currentTimeMillis()).thenReturn(150L);
+ scheduler.runFirstTask();
assertThat(pool.currentPoolSize()).isEqualTo(2);
verify(pooledConnection1, times(1)).close();
@@ -481,7 +477,7 @@
verify(pooledConnection4, times(0)).close();
// Regrow the pool at time 200.
- when(timeSource.call()).thenReturn(200L);
+ when(pool.timeSource.currentTimeMillis()).thenReturn(200L);
Connection c5 = pool.getConnection(); // pooledConnection3
Connection c6 = pool.getConnection(); // pooledConnection4
Connection c7 = pool.getConnection(); // pooledConnection5
@@ -494,13 +490,13 @@
assertThat(pool.currentPoolSize()).isEqualTo(4);
// Third purge at time 250 should not remove any connections.
- when(timeSource.call()).thenReturn(250L);
- scheduler.getCommand().run();
+ when(pool.timeSource.currentTimeMillis()).thenReturn(250L);
+ scheduler.runFirstTask();
assertThat(pool.currentPoolSize()).isEqualTo(4);
// Fourth purge at time 350 should remove 2 non-core connections.
- when(timeSource.call()).thenReturn(350L);
- scheduler.getCommand().run();
+ when(pool.timeSource.currentTimeMillis()).thenReturn(350L);
+ scheduler.runFirstTask();
assertThat(pool.currentPoolSize()).isEqualTo(2);
verify(pooledConnection3, times(1)).close();
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
index 06e5337..e44cf75 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
@@ -27,13 +27,15 @@
package org.forgerock.opendj.ldap;
import static org.fest.assertions.Assertions.assertThat;
-import static org.forgerock.opendj.ldap.Connections.newHeartBeatConnectionFactory;
+import static org.fest.assertions.Fail.fail;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.forgerock.opendj.ldap.SearchScope.BASE_OBJECT;
import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnection;
import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource;
import static org.forgerock.opendj.ldap.requests.Requests.newModifyRequest;
import static org.forgerock.opendj.ldap.requests.Requests.newSearchRequest;
+import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
import static org.forgerock.opendj.ldap.responses.Responses.newResult;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
@@ -46,14 +48,22 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.responses.Result;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.forgerock.opendj.util.CompletedFutureResult;
+import com.forgerock.opendj.util.StaticUtils;
+
/**
* Tests the connection pool implementation..
*/
@@ -78,48 +88,169 @@
// @formatter:on
- @Test(enabled = false)
- public void testHeartBeatTimeout() throws Exception {
- // Mock connection which never responds to heartbeat.
- final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>();
- final Connection connection = mockConnection(listeners);
- when(connection.isValid()).thenReturn(true);
+ private static final SearchRequest HEARTBEAT = newSearchRequest("dc=test", BASE_OBJECT,
+ "(objectclass=*)", "1.1");
- // Underlying connection factory.
- final ConnectionFactory factory = mockConnectionFactory(connection);
+ private Connection connection;
+ private ConnectionFactory factory;
+ private Connection hbc;
+ private HeartBeatConnectionFactory hbcf;
+ private List<ConnectionEventListener> listeners;
+ private MockScheduler scheduler;
- // Create heart beat connection factory.
- final SearchRequest hb = newSearchRequest("dc=test", BASE_OBJECT, "(objectclass=*)", "1.1");
- final MockScheduler scheduler = new MockScheduler();
- final ConnectionFactory hbcf =
- newHeartBeatConnectionFactory(factory, 0, TimeUnit.MILLISECONDS, hb, scheduler);
+ /**
+ * Disables logging before the tests.
+ */
+ @BeforeClass()
+ public void disableLogging() {
+ StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE);
+ }
- // First connection should cause heart beat to be scheduled.
- final Connection hbc = hbcf.getConnection();
- assertThat(scheduler.isScheduled()).isTrue();
- assertThat(scheduler.getCommand()).isNotNull();
- assertThat(scheduler.getDelay()).isEqualTo(0);
- assertThat(scheduler.getUnit()).isEqualTo(TimeUnit.MILLISECONDS);
+ /**
+ * Re-enable logging after the tests.
+ */
+ @AfterClass()
+ public void enableLogging() {
+ StaticUtils.DEBUG_LOG.setLevel(Level.INFO);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ try {
+ if (hbc != null) {
+ hbc.close();
+ assertThat(hbc.isValid()).isFalse();
+ assertThat(hbc.isClosed()).isTrue();
+ }
+ scheduler.runAllTasks(); // Flush any remaining timeout tasks.
+ assertThat(scheduler.isScheduled()).isFalse(); // No more connections to check.
+ hbcf.close();
+ } finally {
+ connection = null;
+ factory = null;
+ hbcf = null;
+ listeners = null;
+ scheduler = null;
+ hbc = null;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testBindWhileHeartBeatInProgress() throws Exception {
+ // Mock connection with successful heartbeat.
+ init(ResultCode.SUCCESS);
+
+ // Get a connection.
+ hbc = hbcf.getConnection();
+
+ /*
+ * Send a heartbeat, trapping the search call-back so that we can send
+ * the response once we have attempted a bind.
+ */
+ when(
+ connection.searchAsync(any(SearchRequest.class),
+ any(IntermediateResponseHandler.class), any(SearchResultHandler.class)))
+ .thenReturn(null);
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
+ scheduler.runAllTasks(); // Send the heartbeat.
+
+ // Capture the heartbeat search result handler.
+ final ArgumentCaptor<SearchResultHandler> arg =
+ ArgumentCaptor.forClass(SearchResultHandler.class);
+ verify(connection, times(2)).searchAsync(same(HEARTBEAT),
+ any(IntermediateResponseHandler.class), arg.capture());
+ assertThat(hbc.isValid()).isTrue(); // Not checked yet.
+
+ /*
+ * Now attempt a bind request, which should be held in a queue until the
+ * heart beat completes.
+ */
+ hbc.bindAsync(newSimpleBindRequest(), null, null);
+ verify(connection, times(0)).bindAsync(any(BindRequest.class),
+ any(IntermediateResponseHandler.class), any(ResultHandler.class));
+
+ // Send fake heartbeat response, releasing the bind request.
+ arg.getValue().handleResult(newResult(ResultCode.SUCCESS));
+ verify(connection, times(1)).bindAsync(any(BindRequest.class),
+ any(IntermediateResponseHandler.class), any(ResultHandler.class));
+ }
+
+ @Test
+ public void testGetConnection() throws Exception {
+ // Mock connection with successful initial heartbeat.
+ init(ResultCode.SUCCESS);
+ hbc = hbcf.getConnection();
+ assertThat(hbc).isNotNull();
+ assertThat(hbc.isValid()).isTrue();
+ }
+
+ @Test
+ public void testGetConnectionAsync() throws Exception {
+ // Mock connection with successful initial heartbeat.
+ init(ResultCode.SUCCESS);
+ hbc = hbcf.getConnectionAsync(null).get();
+ assertThat(hbc).isNotNull();
+ assertThat(hbc.isValid()).isTrue();
+ }
+
+ @Test
+ public void testGetConnectionAsyncWithInitialHeartBeatError() throws Exception {
+ // Mock connection with failing initial heartbeat.
+ init(ResultCode.BUSY);
+ try {
+ hbcf.getConnectionAsync(null).get();
+ fail("Unexpectedly obtained a connection");
+ } catch (final ErrorResultException e) {
+ checkInitialHeartBeatFailure(e);
+ }
+ }
+
+ @Test
+ public void testGetConnectionWithInitialHeartBeatError() throws Exception {
+ // Mock connection with failing initial heartbeat.
+ init(ResultCode.BUSY);
+ try {
+ hbcf.getConnection();
+ fail("Unexpectedly obtained a connection");
+ } catch (final ErrorResultException e) {
+ checkInitialHeartBeatFailure(e);
+ }
+ }
+
+ @Test
+ public void testHeartBeatSucceedsThenFails() throws Exception {
+ // Mock connection with successful heartbeat.
+ init(ResultCode.SUCCESS);
+
+ // Get a connection and check that it was pinged.
+ hbc = hbcf.getConnection();
+
+ verifyHeartBeatSent(connection, 1);
+ assertThat(scheduler.isScheduled()).isTrue(); // heartbeater
assertThat(listeners).hasSize(1);
+ assertThat(hbc.isValid()).isTrue();
- // The connection should be immediately invalid due to 0 timeout.
- assertThat(connection.isValid()).isTrue();
+ // Invoke heartbeat before the connection is considered idle.
+ scheduler.runAllTasks();
+ verifyHeartBeatSent(connection, 1); // No heartbeat sent - not idle yet.
+ assertThat(hbc.isValid()).isTrue();
+
+ // Invoke heartbeat after the connection is considered idle.
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(6000L);
+ scheduler.runAllTasks();
+ verifyHeartBeatSent(connection, 2); // Heartbeat sent.
+ assertThat(hbc.isValid()).isTrue();
+
+ // Now force the heartbeat to fail.
+ mockHeartBeatResponse(connection, listeners, ResultCode.CLIENT_SIDE_SERVER_DOWN);
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
+ scheduler.runAllTasks();
+ verifyHeartBeatSent(connection, 3);
assertThat(hbc.isValid()).isFalse();
- /*
- * Attempt to send heartbeat. This should trigger the connection to be
- * closed and all subsequent request attempts to fail.
- */
- scheduler.getCommand().run();
-
- // The underlying connection should have been closed.
- verify(connection).close();
-
- /*
- * ...and the scheduled heart beat stopped because there are no
- * remaining connections.
- */
- assertThat(scheduler.isScheduled()).isFalse();
+ // Flush redundant timeout tasks.
+ scheduler.runAllTasks();
// Attempt to send a new request: it should fail immediately.
@SuppressWarnings("unchecked")
@@ -130,120 +261,150 @@
verify(mockHandler).handleErrorResult(arg.capture());
assertThat(arg.getValue().getResult().getResultCode()).isEqualTo(
ResultCode.CLIENT_SIDE_SERVER_DOWN);
+
+ assertThat(hbc.isValid()).isFalse();
+ assertThat(hbc.isClosed()).isFalse();
}
@Test
- public void testSchedulerRegistration() throws Exception {
- // Three mock connections.
- final List<ConnectionEventListener> listeners1 = new LinkedList<ConnectionEventListener>();
- final Connection connection1 = heartBeat(mockConnection(listeners1), ResultCode.SUCCESS);
+ public void testHeartBeatTimeout() throws Exception {
+ // Mock connection with successful heartbeat.
+ init(ResultCode.SUCCESS);
- final List<ConnectionEventListener> listeners2 = new LinkedList<ConnectionEventListener>();
- final Connection connection2 = heartBeat(mockConnection(listeners2), ResultCode.SUCCESS);
+ // Get a connection and check that it was pinged.
+ hbc = hbcf.getConnection();
- final List<ConnectionEventListener> listeners3 = new LinkedList<ConnectionEventListener>();
- final Connection connection3 = heartBeat(mockConnection(listeners3), ResultCode.SUCCESS);
-
- // Underlying connection factory.
- final ConnectionFactory factory =
- mockConnectionFactory(connection1, connection2, connection3);
-
- // Create heart beat connection factory.
- final SearchRequest hb = newSearchRequest("dc=test", BASE_OBJECT, "(objectclass=*)", "1.1");
- final MockScheduler scheduler = new MockScheduler();
- final ConnectionFactory hbcf =
- newHeartBeatConnectionFactory(factory, 0, TimeUnit.MILLISECONDS, hb, scheduler);
-
- // Heart beat should not be scheduled yet.
- assertThat(scheduler.isScheduled()).isFalse();
-
- // First connection should cause heart beat to be scheduled.
- hbcf.getConnection();
- assertThat(scheduler.isScheduled()).isTrue();
- assertThat(scheduler.getCommand()).isNotNull();
- assertThat(scheduler.getDelay()).isEqualTo(0);
- assertThat(scheduler.getUnit()).isEqualTo(TimeUnit.MILLISECONDS);
- assertThat(listeners1).hasSize(1);
-
- // Second connection should not change anything.
- hbcf.getConnection();
- assertThat(scheduler.isScheduled()).isTrue();
- assertThat(listeners2).hasSize(1);
-
- // Check heart-beat sent to both connections.
- scheduler.getCommand().run();
- verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
- any(SearchResultHandler.class));
- verify(connection2, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
- any(SearchResultHandler.class));
- verify(connection3, times(0)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
- any(SearchResultHandler.class));
-
- // Close first connection: heart beat should still be scheduled.
- listeners1.get(0).handleConnectionClosed();
- assertThat(scheduler.isScheduled()).isTrue();
- assertThat(listeners1).isEmpty();
-
- // Check heart-beat only sent to second connection.
- scheduler.getCommand().run();
- verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
- any(SearchResultHandler.class));
- verify(connection2, times(2)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
- any(SearchResultHandler.class));
- verify(connection3, times(0)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
- any(SearchResultHandler.class));
-
- // Close second connection: heart beat should now be stopped.
- listeners2.get(0).handleConnectionClosed();
- assertThat(scheduler.isScheduled()).isFalse();
- assertThat(listeners2).isEmpty();
-
- // Opening another connection should restart the heart beat.
- hbcf.getConnection();
- assertThat(scheduler.isScheduled()).isTrue();
- assertThat(scheduler.getCommand()).isNotNull();
- assertThat(scheduler.getDelay()).isEqualTo(0);
- assertThat(scheduler.getUnit()).isEqualTo(TimeUnit.MILLISECONDS);
- assertThat(listeners3).hasSize(1);
-
- // Check heart-beat only sent to third connection.
- scheduler.getCommand().run();
- verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
- any(SearchResultHandler.class));
- verify(connection2, times(2)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
- any(SearchResultHandler.class));
- verify(connection3, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
- any(SearchResultHandler.class));
+ // Now force the heartbeat to fail due to timeout.
+ mockHeartBeatResponse(connection, listeners, null /* no response */);
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
+ scheduler.runAllTasks(); // Send the heartbeat.
+ verifyHeartBeatSent(connection, 2);
+ assertThat(hbc.isValid()).isTrue(); // Not checked yet.
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(12000L);
+ scheduler.runAllTasks(); // Check for heartbeat.
+ assertThat(hbc.isValid()).isFalse(); // Now invalid.
+ assertThat(hbc.isClosed()).isFalse();
}
- private Connection heartBeat(final Connection mockConnection, final ResultCode resultCode) {
- doAnswer(new Answer<Void>() {
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testHeartBeatWhileBindInProgress() throws Exception {
+ // Mock connection with successful heartbeat.
+ init(ResultCode.SUCCESS);
+
+ // Get a connection.
+ hbc = hbcf.getConnection();
+
+ /*
+ * Send a bind request, trapping the bind call-back so that we can send
+ * the response once we have attempted a heartbeat.
+ */
+ when(
+ connection.bindAsync(any(BindRequest.class),
+ any(IntermediateResponseHandler.class), any(ResultHandler.class)))
+ .thenReturn(null);
+ hbc.bindAsync(newSimpleBindRequest(), null, null);
+
+ // Capture the bind result handler.
+ @SuppressWarnings("rawtypes")
+ final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
+ verify(connection, times(1)).bindAsync(any(BindRequest.class),
+ any(IntermediateResponseHandler.class), arg.capture());
+
+ /*
+ * Now attempt the heartbeat which should not happen because there is a
+ * bind in progress.
+ */
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
+ scheduler.runAllTasks(); // Attempt to send the heartbeat.
+ verify(connection, times(1)).searchAsync(same(HEARTBEAT),
+ any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+
+ // Send fake bind response, releasing the heartbeat.
+ arg.getValue().handleResult(newResult(ResultCode.SUCCESS));
+
+ // Attempt to send a heartbeat again.
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(16000L);
+ scheduler.runAllTasks(); // Attempt to send the heartbeat.
+ verify(connection, times(2)).searchAsync(same(HEARTBEAT),
+ any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+ }
+
+ @Test
+ public void testToString() {
+ init(ResultCode.SUCCESS);
+ assertThat(hbcf.toString()).isNotNull();
+ }
+
+ private void checkInitialHeartBeatFailure(final ErrorResultException e) {
+ /*
+ * Initial heartbeat failure should trigger connection exception with
+ * heartbeat cause.
+ */
+ assertThat(e).isInstanceOf(ConnectionException.class);
+ assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_SERVER_DOWN);
+ assertThat(e.getCause()).isInstanceOf(ErrorResultException.class);
+ assertThat(((ErrorResultException) e.getCause()).getResult().getResultCode()).isEqualTo(
+ ResultCode.BUSY);
+ }
+
+ private void init(final ResultCode initialHeartBeatResult) {
+ // Mock connection with successful heartbeat.
+ listeners = new LinkedList<ConnectionEventListener>();
+ connection = mockConnection(listeners);
+ when(connection.isValid()).thenReturn(true);
+ mockHeartBeatResponse(connection, listeners, initialHeartBeatResult);
+
+ // Underlying connection factory.
+ factory = mockConnectionFactory(connection);
+
+ // Create heart beat connection factory.
+ scheduler = new MockScheduler();
+ hbcf =
+ new HeartBeatConnectionFactory(factory, 10000, 100, TimeUnit.MILLISECONDS,
+ HEARTBEAT, scheduler);
+
+ // Set initial time stamp.
+ hbcf.timeSource = mockTimeSource(0);
+ }
+
+ private Connection mockHeartBeatResponse(final Connection mockConnection,
+ final List<ConnectionEventListener> listeners, final ResultCode resultCode) {
+ doAnswer(new Answer<FutureResult<Result>>() {
@Override
- public Void answer(final InvocationOnMock invocation) throws Throwable {
+ public FutureResult<Result> answer(final InvocationOnMock invocation) throws Throwable {
+ if (resultCode == null) {
+ return null;
+ }
+
final SearchResultHandler handler =
(SearchResultHandler) invocation.getArguments()[2];
if (resultCode.isExceptional()) {
- handler.handleErrorResult(newErrorResult(resultCode));
+ final ErrorResultException error = newErrorResult(resultCode);
+ if (handler != null) {
+ handler.handleErrorResult(error);
+ }
+ if (error instanceof ConnectionException) {
+ for (final ConnectionEventListener listener : listeners) {
+ listener.handleConnectionError(false, error);
+ }
+ }
+ return new CompletedFutureResult<Result>(error);
} else {
- handler.handleResult(newResult(resultCode));
+ final Result result = newResult(resultCode);
+ if (handler != null) {
+ handler.handleResult(result);
+ }
+ return new CompletedFutureResult<Result>(result);
}
- return null;
}
}).when(mockConnection).searchAsync(any(SearchRequest.class),
any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
return mockConnection;
}
- // private void sleepFor(final long ms) {
- // // Avoid premature wake-ups.
- // final long until = System.currentTimeMillis() + ms;
- // do {
- // try {
- // Thread.sleep(until - System.currentTimeMillis());
- // } catch (final InterruptedException e) {
- // Thread.currentThread().interrupt();
- // return;
- // }
- // } while (System.currentTimeMillis() < until);
- // }
+ private void verifyHeartBeatSent(final Connection connection, final int times) {
+ verify(connection, times(times)).searchAsync(same(HEARTBEAT),
+ any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+ }
}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java
index af7ff5c..605711c 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java
@@ -25,10 +25,14 @@
*/
package org.forgerock.opendj.ldap;
+import static java.util.concurrent.Executors.callable;
+
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -43,11 +47,74 @@
*/
final class MockScheduler implements ScheduledExecutorService {
- // Saved scheduled task.
- private Runnable command;
- private long delay;
- private boolean isScheduled = false;
- private TimeUnit unit;
+ private final class ScheduledCallableFuture<T> implements ScheduledFuture<T>, Callable<T> {
+ private final Callable<T> callable;
+ private final CountDownLatch isDone = new CountDownLatch(1);
+ private final boolean removeAfterCall;
+ private T result = null;
+
+ private ScheduledCallableFuture(final Callable<T> callable, final boolean removeAfterCall) {
+ this.callable = callable;
+ this.removeAfterCall = removeAfterCall;
+ }
+
+ @Override
+ public T call() throws Exception {
+ result = callable.call();
+ isDone.countDown();
+ if (removeAfterCall) {
+ tasks.remove(this);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ return tasks.remove(this);
+ }
+
+ @Override
+ public int compareTo(final Delayed o) {
+ // Unused.
+ return 0;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ isDone.await();
+ return result;
+ }
+
+ @Override
+ public T get(final long timeout, final TimeUnit unit) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ if (isDone.await(timeout, unit)) {
+ return result;
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ @Override
+ public long getDelay(final TimeUnit unit) {
+ // Unused.
+ return 0;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return tasks.contains(this);
+ }
+
+ @Override
+ public boolean isDone() {
+ return isDone.getCount() == 0;
+ }
+
+ }
+
+ // Saved scheduled tasks.
+ private final List<Callable<?>> tasks = new CopyOnWriteArrayList<Callable<?>>();
MockScheduler() {
// Nothing to do.
@@ -109,74 +176,24 @@
@Override
public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay,
final TimeUnit unit) {
- // Unused.
- return null;
+ return onceOnly(callable);
}
@Override
public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
- // Unused.
- return null;
+ return onceOnly(callable(command));
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay,
final long period, final TimeUnit unit) {
- // Unused.
- return null;
+ return repeated(callable(command));
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command,
final long initialDelay, final long delay, final TimeUnit unit) {
- this.command = command;
- this.delay = delay;
- this.unit = unit;
- this.isScheduled = true;
- return new ScheduledFuture<Object>() {
- @Override
- public boolean cancel(final boolean mayInterruptIfRunning) {
- isScheduled = false;
- return true;
- }
-
- @Override
- public int compareTo(final Delayed o) {
- // Unused.
- return 0;
- }
-
- @Override
- public Object get() throws InterruptedException, ExecutionException {
- // Unused.
- return null;
- }
-
- @Override
- public Object get(final long timeout, final TimeUnit unit) throws InterruptedException,
- ExecutionException, TimeoutException {
- // Unused.
- return null;
- }
-
- @Override
- public long getDelay(final TimeUnit unit) {
- // Unused.
- return 0;
- }
-
- @Override
- public boolean isCancelled() {
- return !isScheduled;
- }
-
- @Override
- public boolean isDone() {
- // Unused.
- return false;
- }
-
- };
+ return repeated(callable(command));
}
@Override
@@ -192,35 +209,62 @@
@Override
public <T> Future<T> submit(final Callable<T> task) {
- // Unused.
- return null;
+ return onceOnly(task);
}
@Override
public Future<?> submit(final Runnable task) {
- // Unused.
- return null;
+ return onceOnly(callable(task));
}
@Override
public <T> Future<T> submit(final Runnable task, final T result) {
- // Unused.
- return null;
+ return onceOnly(callable(task, result));
}
- Runnable getCommand() {
- return command;
+ List<Callable<?>> getAllTasks() {
+ return tasks;
}
- long getDelay() {
- return delay;
- }
-
- TimeUnit getUnit() {
- return unit;
+ Callable<?> getFirstTask() {
+ return tasks.get(0);
}
boolean isScheduled() {
- return isScheduled;
+ return !tasks.isEmpty();
+ }
+
+ void runAllTasks() {
+ for (final Callable<?> task : tasks) {
+ runTask0(task);
+ }
+ }
+
+ void runFirstTask() {
+ runTask(0);
+ }
+
+ void runTask(final int i) {
+ runTask0(tasks.get(i));
+ }
+
+ private <T> ScheduledCallableFuture<T> onceOnly(final Callable<T> callable) {
+ final ScheduledCallableFuture<T> wrapped = new ScheduledCallableFuture<T>(callable, true);
+ tasks.add(wrapped);
+ return wrapped;
+ }
+
+ private <T> ScheduledCallableFuture<T> repeated(final Callable<T> callable) {
+ final ScheduledCallableFuture<T> wrapped = new ScheduledCallableFuture<T>(callable, false);
+ tasks.add(wrapped);
+ return wrapped;
+ }
+
+ private void runTask0(final Callable<?> task) {
+ try {
+ task.call();
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java
index 48facfa..708776d 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java
@@ -33,7 +33,6 @@
* An abstract class that all types unit tests should extend. A type represents
* the classes found directly under the package org.forgerock.opendj.ldap.
*/
-
@Test(groups = { "precommit", "types", "sdk" })
public abstract class SdkTestCase extends ForgeRockTestCase {
}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java
index 33b34f5..2a8c3bd 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2012 ForgeRock AS.
+ * Portions copyright 2012-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -42,8 +42,10 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
import com.forgerock.opendj.util.CompletedFutureResult;
+import com.forgerock.opendj.util.TimeSource;
/**
* This class defines some utility functions which can be used by test cases.
@@ -202,4 +204,22 @@
return mockConnection;
}
+
+ /**
+ * Returns a mock {@link TimeSource} which can be used for injecting fake
+ * time stamps into components.
+ *
+ * @param times
+ * The times in milli-seconds which should be returned by the
+ * time source.
+ * @return The mock time source.
+ */
+ public static TimeSource mockTimeSource(final long... times) {
+ final TimeSource mock = mock(TimeSource.class);
+ OngoingStubbing<Long> stubbing = when(mock.currentTimeMillis());
+ for (long t : times) {
+ stubbing = stubbing.thenReturn(t);
+ }
+ return mock;
+ }
}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtilsTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtilsTestCase.java
new file mode 100644
index 0000000..3cc8944
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtilsTestCase.java
@@ -0,0 +1,53 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.forgerock.opendj.ldap;
+
+import static org.fest.assertions.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource;
+
+import org.testng.annotations.Test;
+
+import com.forgerock.opendj.util.TimeSource;
+
+@SuppressWarnings("javadoc")
+public class TestCaseUtilsTestCase extends SdkTestCase {
+
+ /**
+ * Test for {@link #mockTimeSource(long...)}.
+ */
+ @Test
+ public void testMockTimeSource() {
+ final TimeSource mock1 = mockTimeSource(10);
+ assertThat(mock1.currentTimeMillis()).isEqualTo(10);
+ assertThat(mock1.currentTimeMillis()).isEqualTo(10);
+
+ final TimeSource mock2 = mockTimeSource(10, 20, 30);
+ assertThat(mock2.currentTimeMillis()).isEqualTo(10);
+ assertThat(mock2.currentTimeMillis()).isEqualTo(20);
+ assertThat(mock2.currentTimeMillis()).isEqualTo(30);
+ assertThat(mock2.currentTimeMillis()).isEqualTo(30);
+ }
+}
diff --git a/opendj3/opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json b/opendj3/opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json
index 7c2e135..0381038 100644
--- a/opendj3/opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json
+++ b/opendj3/opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json
@@ -35,8 +35,10 @@
// Re-usable pool of 24 connections per server.
"connectionPoolSize" : 24,
- // Check pooled connections are alive every 30 seconds.
- "heartBeatIntervalSeconds" : 30,
+ // Check pooled connections are alive every 30 seconds with a 500ms
+ // heart beat timeout.
+ "heartBeatIntervalSeconds" : 30,
+ "heartBeatTimeoutMilliSeconds" : 500,
// The preferred load-balancing pool.
"primaryLDAPServers" : [
diff --git a/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java b/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
index 7f84a81..edb811a 100644
--- a/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
+++ b/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
@@ -970,6 +970,9 @@
Math.max(configuration.get("connectionPoolSize").defaultTo(10).asInteger(), 1);
final int heartBeatIntervalSeconds =
Math.max(configuration.get("heartBeatIntervalSeconds").defaultTo(30).asInteger(), 1);
+ final int heartBeatTimeoutMilliSeconds =
+ Math.max(configuration.get("heartBeatTimeoutMilliSeconds").defaultTo(500)
+ .asInteger(), 100);
// Parse authentication parameters.
final BindRequest bindRequest;
@@ -1036,7 +1039,7 @@
}
final ConnectionFactory primary =
parseLDAPServers(primaryLDAPServers, bindRequest, connectionPoolSize,
- heartBeatIntervalSeconds, options);
+ heartBeatIntervalSeconds, heartBeatTimeoutMilliSeconds, options);
// Parse secondary data center(s).
final JsonValue secondaryLDAPServers = configuration.get("secondaryLDAPServers");
@@ -1045,7 +1048,7 @@
if (secondaryLDAPServers.size() > 0) {
secondary =
parseLDAPServers(secondaryLDAPServers, bindRequest, connectionPoolSize,
- heartBeatIntervalSeconds, options);
+ heartBeatIntervalSeconds, heartBeatTimeoutMilliSeconds, options);
} else {
secondary = null;
}
@@ -1093,20 +1096,22 @@
private static ConnectionFactory parseLDAPServers(final JsonValue config,
final BindRequest bindRequest, final int connectionPoolSize,
- final int heartBeatIntervalSeconds, final LDAPOptions options) {
+ final int heartBeatIntervalSeconds, final int heartBeatTimeoutMilliSeconds,
+ final LDAPOptions options) {
final List<ConnectionFactory> servers = new ArrayList<ConnectionFactory>(config.size());
for (final JsonValue server : config) {
final String host = server.get("hostname").required().asString();
final int port = server.get("port").required().asInteger();
ConnectionFactory factory = new LDAPConnectionFactory(host, port, options);
+ factory =
+ Connections.newHeartBeatConnectionFactory(factory,
+ heartBeatIntervalSeconds * 1000, heartBeatTimeoutMilliSeconds,
+ TimeUnit.MILLISECONDS);
if (bindRequest != null) {
factory = Connections.newAuthenticatedConnectionFactory(factory, bindRequest);
}
if (connectionPoolSize > 1) {
factory =
- Connections.newHeartBeatConnectionFactory(factory,
- heartBeatIntervalSeconds, TimeUnit.SECONDS);
- factory =
Connections.newCachedConnectionPool(factory, 0, connectionPoolSize, 60L,
TimeUnit.SECONDS);
}
--
Gitblit v1.10.0