From 3f7ddbf313aaabbfba4650cb2036cb41e51a9bde Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 18 Apr 2013 11:37:28 +0000
Subject: [PATCH] Fix OPENDJ-838: Add ConnectionFactory.close() method to facilitate resource cleanup after application exit
---
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java | 152 +++++++++++++++++++++-----------------------------
1 files changed, 63 insertions(+), 89 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
index aead5f3..a868df4 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -22,11 +22,13 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS.
+ * Portions copyright 2011-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.util.ArrayList;
@@ -39,7 +41,7 @@
import java.util.logging.Level;
import com.forgerock.opendj.util.AsynchronousFutureResult;
-import com.forgerock.opendj.util.StaticUtils;
+import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
@@ -55,11 +57,8 @@
ResultHandler<Connection> {
private final ConnectionFactory factory;
-
private final AtomicBoolean isOperational = new AtomicBoolean(true);
-
private volatile FutureResult<?> pendingConnectFuture = null;
-
private final int index;
private MonitoredConnectionFactory(final ConnectionFactory factory, final int index) {
@@ -67,9 +66,12 @@
this.index = index;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ // Should we cancel the future?
+ factory.close();
+ }
+
@Override
public Connection getConnection() throws ErrorResultException {
final Connection connection;
@@ -87,14 +89,12 @@
return connection;
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> resultHandler) {
final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future =
- new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(resultHandler);
+ new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
+ resultHandler);
final ResultHandler<Connection> failoverHandler = new ResultHandler<Connection>() {
@Override
@@ -141,9 +141,6 @@
connection.close();
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
return factory.toString();
@@ -156,9 +153,9 @@
private synchronized void checkIfAvailable() {
if (!isOperational.get()
&& (pendingConnectFuture == null || pendingConnectFuture.isDone())) {
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
- StaticUtils.DEBUG_LOG.fine(String
- .format("Attempting reconnect to offline factory " + this));
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format("Attempting reconnect to offline factory '%s'",
+ this));
}
pendingConnectFuture = factory.getConnectionAsync(this);
}
@@ -167,21 +164,22 @@
private void notifyOffline(final ErrorResultException error) {
if (isOperational.getAndSet(false)) {
// Transition from online to offline.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) {
- StaticUtils.DEBUG_LOG.warning(String.format("Connection factory " + factory
- + " is no longer operational: " + error.getMessage()));
+ if (DEBUG_LOG.isLoggable(Level.WARNING)) {
+ DEBUG_LOG.warning(String.format(
+ "Connection factory '%s' is no longer operational: %s", factory, error
+ .getMessage()));
}
synchronized (stateLock) {
offlineFactoriesCount++;
if (offlineFactoriesCount == 1) {
// Enable monitoring.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
- StaticUtils.DEBUG_LOG.fine(String.format("Starting monitoring thread"));
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format("Starting monitoring thread"));
}
monitoringFuture =
- scheduler.scheduleWithFixedDelay(new MonitorRunnable(), 0,
+ scheduler.get().scheduleWithFixedDelay(new MonitorRunnable(), 0,
monitoringInterval, monitoringIntervalTimeUnit);
}
}
@@ -191,16 +189,16 @@
private void notifyOnline() {
if (!isOperational.getAndSet(true)) {
// Transition from offline to online.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO)) {
- StaticUtils.DEBUG_LOG.info(String.format("Connection factory " + factory
- + " is now operational"));
+ if (DEBUG_LOG.isLoggable(Level.INFO)) {
+ DEBUG_LOG.info(String.format("Connection factory'%s' is now operational",
+ factory));
}
synchronized (stateLock) {
offlineFactoriesCount--;
if (offlineFactoriesCount == 0) {
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
- StaticUtils.DEBUG_LOG.fine(String.format("Stopping monitoring thread"));
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format("Stopping monitoring thread"));
}
monitoringFuture.cancel(false);
@@ -225,91 +223,65 @@
}
private final List<MonitoredConnectionFactory> monitoredFactories;
-
- private final ScheduledExecutorService scheduler;
-
+ private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
private final Object stateLock = new Object();
-
- // Guarded by stateLock.
- private int offlineFactoriesCount = 0;
-
- private final long monitoringInterval;
-
- private final TimeUnit monitoringIntervalTimeUnit;
-
- // Guarded by stateLock.
- private ScheduledFuture<?> monitoringFuture;
-
/**
- * Creates a new abstract load balancing algorithm which will monitor
- * offline connection factories every second using the default scheduler.
- *
- * @param factories
- * The connection factories.
+ * Guarded by stateLock.
*/
+ private int offlineFactoriesCount = 0;
+ private final long monitoringInterval;
+ private final TimeUnit monitoringIntervalTimeUnit;
+ /**
+ * Guarded by stateLock.
+ */
+ private ScheduledFuture<?> monitoringFuture;
+ private AtomicBoolean isClosed = new AtomicBoolean();
+
AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) {
- this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
+ this(factories, 1, TimeUnit.SECONDS, null);
}
- /**
- * Creates a new abstract load balancing algorithm which will monitor
- * offline connection factories using the specified frequency using the
- * default scheduler.
- *
- * @param factories
- * The connection factories.
- * @param interval
- * The interval between attempts to poll offline factories.
- * @param unit
- * The time unit for the interval between attempts to poll
- * offline factories.
- */
AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
final long interval, final TimeUnit unit) {
- this(factories, interval, unit, StaticUtils.getDefaultScheduler());
+ this(factories, interval, unit, null);
}
- /**
- * Creates a new abstract load balancing algorithm which will monitor
- * offline connection factories using the specified frequency and scheduler.
- *
- * @param factories
- * The connection factories.
- * @param interval
- * The interval between attempts to poll offline factories.
- * @param unit
- * The time unit for the interval between attempts to poll
- * offline factories.
- * @param scheduler
- * The scheduler which should for periodically monitoring dead
- * connection factories to see if they are usable again.
- */
AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
final long interval, final TimeUnit unit, final ScheduledExecutorService scheduler) {
- Validator.ensureNotNull(factories, scheduler, unit);
+ Validator.ensureNotNull(factories, unit);
this.monitoredFactories = new ArrayList<MonitoredConnectionFactory>(factories.size());
int i = 0;
for (final ConnectionFactory f : factories) {
this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++));
}
- this.scheduler = scheduler;
+ this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
this.monitoringInterval = interval;
this.monitoringIntervalTimeUnit = unit;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ if (isClosed.compareAndSet(false, true)) {
+ synchronized (stateLock) {
+ if (monitoringFuture != null) {
+ monitoringFuture.cancel(false);
+ monitoringFuture = null;
+ }
+ }
+ for (ConnectionFactory factory : monitoredFactories) {
+ factory.close();
+ }
+ scheduler.release();
+ }
+ }
+
@Override
public final ConnectionFactory getConnectionFactory() throws ErrorResultException {
final int index = getInitialConnectionFactoryIndex();
return getMonitoredConnectionFactory(index);
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
@@ -357,9 +329,11 @@
index = (index + 1) % maxIndex;
} while (index != initialIndex);
- // All factories are offline so give up. We could have a
- // configurable policy here such as waiting indefinitely, or for a
- // configurable timeout period.
+ /*
+ * All factories are offline so give up. We could have a configurable
+ * policy here such as waiting indefinitely, or for a configurable
+ * timeout period.
+ */
throw newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR,
"No operational connection factories available");
}
--
Gitblit v1.10.0