From 3f7ddbf313aaabbfba4650cb2036cb41e51a9bde Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 18 Apr 2013 11:37:28 +0000
Subject: [PATCH] Fix OPENDJ-838: Add ConnectionFactory.close() method to facilitate resource cleanup after application exit
---
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java | 201 ++++++++++++++++++++++++--------------------------
1 files changed, 97 insertions(+), 104 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 9f73c51..669195f 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS.
+ * Portions copyright 2011-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -40,6 +40,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Level;
@@ -64,7 +65,7 @@
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.FutureResultTransformer;
-import com.forgerock.opendj.util.StaticUtils;
+import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
@@ -75,7 +76,7 @@
/**
* A connection that sends heart beats and supports all operations.
*/
- private final class ConnectionImpl extends AbstractConnectionWrapper implements
+ private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements
ConnectionEventListener, SearchResultHandler {
/**
@@ -85,9 +86,8 @@
* @param <R>
* The type of result returned by the request.
*/
- private abstract class DelayedFuture<R extends Result>
- extends AsynchronousFutureResult<R, ResultHandler<? super R>>
- implements Runnable {
+ private abstract class DelayedFuture<R extends Result> extends
+ AsynchronousFutureResult<R, ResultHandler<? super R>> implements Runnable {
private volatile FutureResult<R> innerFuture = null;
protected DelayedFuture(final ResultHandler<? super R> handler) {
@@ -123,14 +123,19 @@
}
- // List of pending Bind or StartTLS requests which must be invoked
- // when the current heart beat completes.
+ /*
+ * List of pending Bind or StartTLS requests which must be invoked when
+ * the current heart beat completes.
+ */
private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>();
- // Coordinates heart-beats with Bind and StartTLS requests.
+ /* Coordinates heart-beats with Bind and StartTLS requests. */
private final Sync sync = new Sync();
- // Timestamp of last response received (any response, not just heart beats).
+ /*
+ * Timestamp of last response received (any response, not just heart
+ * beats).
+ */
private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
private ConnectionImpl(final Connection connection) {
@@ -206,8 +211,10 @@
return connection.bindAsync(request, intermediateResponseHandler, timestamper(
resultHandler, true));
} else {
- // A heart beat must be in progress so create a runnable task
- // which will be executed when the heart beat completes.
+ /*
+ * A heart beat must be in progress so create a runnable task
+ * which will be executed when the heart beat completes.
+ */
final DelayedFuture<BindResult> future =
new DelayedFuture<BindResult>(resultHandler) {
@Override
@@ -216,7 +223,10 @@
timestamper(this, true));
}
};
- // Enqueue and flush if the heart beat has completed in the mean time.
+ /*
+ * Enqueue and flush if the heart beat has completed in the mean
+ * time.
+ */
pendingRequests.offer(future);
flushPendingRequests();
return future;
@@ -342,8 +352,11 @@
return connection.extendedRequestAsync(request, intermediateResponseHandler,
timestamper(resultHandler, true));
} else {
- // A heart beat must be in progress so create a runnable task
- // which will be executed when the heart beat completes.
+ /*
+ * A heart beat must be in progress so create a runnable
+ * task which will be executed when the heart beat
+ * completes.
+ */
final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
@Override
public FutureResult<R> dispatch() {
@@ -351,7 +364,11 @@
intermediateResponseHandler, timestamper(this, true));
}
};
- // Enqueue and flush if the heart beat has completed in the mean time.
+
+ /*
+ * Enqueue and flush if the heart beat has completed in the
+ * mean time.
+ */
pendingRequests.offer(future);
flushPendingRequests();
return future;
@@ -382,7 +399,7 @@
@Override
public void handleErrorResult(final ErrorResultException error) {
if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format("Heartbeat failed: " + error.getMessage()));
+ DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage()));
}
updateTimestamp();
releaseHeartBeatLock();
@@ -582,8 +599,10 @@
}
private void acquireBindOrStartTLSLock() throws ErrorResultException {
- // Wait for pending heartbeats and prevent new heartbeats from
- // being sent while the bind is in progress.
+ /*
+ * Wait for pending heartbeats and prevent new heartbeats from being
+ * sent while the bind is in progress.
+ */
try {
if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
// Give up - it looks like the connection is dead.
@@ -597,8 +616,11 @@
private void flushPendingRequests() {
if (!pendingRequests.isEmpty()) {
- // The pending requests will acquire the shared lock, but we take
- // it here anyway to ensure that pending requests do not get blocked.
+ /*
+ * The pending requests will acquire the shared lock, but we
+ * take it here anyway to ensure that pending requests do not
+ * get blocked.
+ */
if (sync.tryLockShared()) {
try {
Runnable pendingRequest;
@@ -617,7 +639,10 @@
connection.removeConnectionEventListener(this);
activeConnections.remove(this);
if (activeConnections.isEmpty()) {
- // This is the last active connection, so stop the heartbeat.
+ /*
+ * This is the last active connection, so stop the
+ * heartbeat.
+ */
heartBeatFuture.cancel(false);
}
}
@@ -633,22 +658,33 @@
}
private void sendHeartBeat() {
- // Only send the heartbeat if the connection has been idle for some time.
+ /*
+ * Only send the heartbeat if the connection has been idle for some
+ * time.
+ */
if (currentTimeMillis() < (timestamp + minDelayMS)) {
return;
}
- // Don't send a heart beat if there is already a heart beat,
- // bind, or startTLS in progress. Note that the bind/startTLS
- // response will update the timestamp as if it were a heart beat.
+ /*
+ * Don't send a heart beat if there is already a heart beat, bind,
+ * or startTLS in progress. Note that the bind/startTLS response
+ * will update the timestamp as if it were a heart beat.
+ */
if (sync.tryLockExclusively()) {
try {
connection.searchAsync(heartBeatRequest, null, this);
} catch (final Exception e) {
- // This may happen when we attempt to send the heart beat just
- // after the connection is closed but before we are notified.
+ /*
+ * This may happen when we attempt to send the heart beat
+ * just after the connection is closed but before we are
+ * notified.
+ */
- // Release the lock because we're never going to get a response.
+ /*
+ * Release the lock because we're never going to get a
+ * response.
+ */
releaseHeartBeatLock();
}
}
@@ -755,7 +791,7 @@
* </ul>
*/
private static final class Sync extends AbstractQueuedSynchronizer {
- // Lock states. Positive values indicate that the shared lock is taken.
+ /* Lock states. Positive values indicate that the shared lock is taken. */
private static final int UNLOCKED = 0; // initial state
private static final int LOCKED_EXCLUSIVELY = -1;
@@ -809,8 +845,10 @@
}
final int newState = state - 1;
if (compareAndSetState(state, newState)) {
- // We could always return true here, but since there cannot
- // be waiting readers we can specialize for waiting writers.
+ /*
+ * We could always return true here, but since there cannot
+ * be waiting readers we can specialize for waiting writers.
+ */
return newState == UNLOCKED;
}
}
@@ -851,83 +889,29 @@
private final SearchRequest heartBeatRequest;
private final long interval;
private final long minDelayMS;
- private final ScheduledExecutorService scheduler;
+ private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
private final long timeoutMS;
private final TimeUnit unit;
+ private AtomicBoolean isClosed = new AtomicBoolean();
- /**
- * Creates a new heart-beat connection factory which will create connections
- * using the provided connection factory and periodically ping any created
- * connections in order to detect that they are still alive every 10 seconds
- * using the default scheduler.
- *
- * @param factory
- * The connection factory to use for creating connections.
- */
HeartBeatConnectionFactory(final ConnectionFactory factory) {
- this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler());
+ this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null);
}
- /**
- * Creates a new heart-beat connection factory which will create connections
- * using the provided connection factory and periodically ping any created
- * connections in order to detect that they are still alive using the
- * specified frequency and the default scheduler.
- *
- * @param factory
- * The connection factory to use for creating connections.
- * @param interval
- * The interval between keepalive pings.
- * @param unit
- * The time unit for the interval between keepalive pings.
- */
HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
final TimeUnit unit) {
- this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler());
+ this(factory, interval, unit, DEFAULT_SEARCH, null);
}
- /**
- * Creates a new heart-beat connection factory which will create connections
- * using the provided connection factory and periodically ping any created
- * connections using the specified search request in order to detect that
- * they are still alive.
- *
- * @param factory
- * The connection factory to use for creating connections.
- * @param interval
- * The interval between keepalive pings.
- * @param unit
- * The time unit for the interval between keepalive pings.
- * @param heartBeat
- * The search request to use for keepalive pings.
- */
HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
final TimeUnit unit, final SearchRequest heartBeat) {
- this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler());
+ this(factory, interval, unit, heartBeat, null);
}
- /**
- * Creates a new heart-beat connection factory which will create connections
- * using the provided connection factory and periodically ping any created
- * connections using the specified search request in order to detect that
- * they are still alive.
- *
- * @param factory
- * The connection factory to use for creating connections.
- * @param interval
- * The interval between keepalive pings.
- * @param unit
- * The time unit for the interval between keepalive pings.
- * @param heartBeat
- * The search request to use for keepalive pings.
- * @param scheduler
- * The scheduler which should for periodically sending keepalive
- * pings.
- */
HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
final TimeUnit unit, final SearchRequest heartBeat,
final ScheduledExecutorService scheduler) {
- Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
+ Validator.ensureNotNull(factory, heartBeat, unit);
Validator.ensureTrue(interval >= 0, "negative timeout");
this.heartBeatRequest = heartBeat;
@@ -935,22 +919,34 @@
this.unit = unit;
this.activeConnections = new LinkedList<ConnectionImpl>();
this.factory = factory;
- this.scheduler = scheduler;
+ this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
this.timeoutMS = unit.toMillis(interval) * 2;
this.minDelayMS = unit.toMillis(interval) / 2;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ if (isClosed.compareAndSet(false, true)) {
+ synchronized (activeConnections) {
+ if (!activeConnections.isEmpty()) {
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format(
+ "HeartbeatConnectionFactory '%s' is closing while %d "
+ + "active connections remain", toString(),
+ activeConnections.size()));
+ }
+ }
+ }
+ scheduler.release();
+ factory.close();
+ }
+ }
+
@Override
public Connection getConnection() throws ErrorResultException {
return adaptConnection(factory.getConnection());
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
@@ -967,9 +963,6 @@
return future;
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
@@ -984,8 +977,8 @@
synchronized (activeConnections) {
connection.addConnectionEventListener(heartBeatConnection);
if (activeConnections.isEmpty()) {
- // This is the first active connection, so start the heart beat.
- heartBeatFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
+ /* This is the first active connection, so start the heart beat. */
+ heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
final ConnectionImpl[] tmp;
--
Gitblit v1.10.0