From 3f7ddbf313aaabbfba4650cb2036cb41e51a9bde Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 18 Apr 2013 11:37:28 +0000
Subject: [PATCH] Fix OPENDJ-838: Add ConnectionFactory.close() method to facilitate resource cleanup after application exit

---
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java |  152 +++++++++++++++++++++-----------------------------
 1 files changed, 63 insertions(+), 89 deletions(-)

diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
index aead5f3..a868df4 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -22,11 +22,13 @@
  *
  *
  *      Copyright 2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2012 ForgeRock AS.
+ *      Portions copyright 2011-2013 ForgeRock AS.
  */
 
 package org.forgerock.opendj.ldap;
 
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
 import static org.forgerock.opendj.ldap.ErrorResultException.*;
 
 import java.util.ArrayList;
@@ -39,7 +41,7 @@
 import java.util.logging.Level;
 
 import com.forgerock.opendj.util.AsynchronousFutureResult;
-import com.forgerock.opendj.util.StaticUtils;
+import com.forgerock.opendj.util.ReferenceCountedObject;
 import com.forgerock.opendj.util.Validator;
 
 /**
@@ -55,11 +57,8 @@
             ResultHandler<Connection> {
 
         private final ConnectionFactory factory;
-
         private final AtomicBoolean isOperational = new AtomicBoolean(true);
-
         private volatile FutureResult<?> pendingConnectFuture = null;
-
         private final int index;
 
         private MonitoredConnectionFactory(final ConnectionFactory factory, final int index) {
@@ -67,9 +66,12 @@
             this.index = index;
         }
 
-        /**
-         * {@inheritDoc}
-         */
+        @Override
+        public void close() {
+            // Should we cancel the future?
+            factory.close();
+        }
+
         @Override
         public Connection getConnection() throws ErrorResultException {
             final Connection connection;
@@ -87,14 +89,12 @@
             return connection;
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public FutureResult<Connection> getConnectionAsync(
                 final ResultHandler<? super Connection> resultHandler) {
             final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future =
-                   new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(resultHandler);
+                    new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
+                            resultHandler);
 
             final ResultHandler<Connection> failoverHandler = new ResultHandler<Connection>() {
                 @Override
@@ -141,9 +141,6 @@
             connection.close();
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public String toString() {
             return factory.toString();
@@ -156,9 +153,9 @@
         private synchronized void checkIfAvailable() {
             if (!isOperational.get()
                     && (pendingConnectFuture == null || pendingConnectFuture.isDone())) {
-                if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
-                    StaticUtils.DEBUG_LOG.fine(String
-                            .format("Attempting reconnect to offline factory " + this));
+                if (DEBUG_LOG.isLoggable(Level.FINE)) {
+                    DEBUG_LOG.fine(String.format("Attempting reconnect to offline factory '%s'",
+                            this));
                 }
                 pendingConnectFuture = factory.getConnectionAsync(this);
             }
@@ -167,21 +164,22 @@
         private void notifyOffline(final ErrorResultException error) {
             if (isOperational.getAndSet(false)) {
                 // Transition from online to offline.
-                if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) {
-                    StaticUtils.DEBUG_LOG.warning(String.format("Connection factory " + factory
-                            + " is no longer operational: " + error.getMessage()));
+                if (DEBUG_LOG.isLoggable(Level.WARNING)) {
+                    DEBUG_LOG.warning(String.format(
+                            "Connection factory '%s' is no longer operational: %s", factory, error
+                                    .getMessage()));
                 }
 
                 synchronized (stateLock) {
                     offlineFactoriesCount++;
                     if (offlineFactoriesCount == 1) {
                         // Enable monitoring.
-                        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
-                            StaticUtils.DEBUG_LOG.fine(String.format("Starting monitoring thread"));
+                        if (DEBUG_LOG.isLoggable(Level.FINE)) {
+                            DEBUG_LOG.fine(String.format("Starting monitoring thread"));
                         }
 
                         monitoringFuture =
-                                scheduler.scheduleWithFixedDelay(new MonitorRunnable(), 0,
+                                scheduler.get().scheduleWithFixedDelay(new MonitorRunnable(), 0,
                                         monitoringInterval, monitoringIntervalTimeUnit);
                     }
                 }
@@ -191,16 +189,16 @@
         private void notifyOnline() {
             if (!isOperational.getAndSet(true)) {
                 // Transition from offline to online.
-                if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO)) {
-                    StaticUtils.DEBUG_LOG.info(String.format("Connection factory " + factory
-                            + " is now operational"));
+                if (DEBUG_LOG.isLoggable(Level.INFO)) {
+                    DEBUG_LOG.info(String.format("Connection factory'%s' is now operational",
+                            factory));
                 }
 
                 synchronized (stateLock) {
                     offlineFactoriesCount--;
                     if (offlineFactoriesCount == 0) {
-                        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
-                            StaticUtils.DEBUG_LOG.fine(String.format("Stopping monitoring thread"));
+                        if (DEBUG_LOG.isLoggable(Level.FINE)) {
+                            DEBUG_LOG.fine(String.format("Stopping monitoring thread"));
                         }
 
                         monitoringFuture.cancel(false);
@@ -225,91 +223,65 @@
     }
 
     private final List<MonitoredConnectionFactory> monitoredFactories;
-
-    private final ScheduledExecutorService scheduler;
-
+    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
     private final Object stateLock = new Object();
-
-    // Guarded by stateLock.
-    private int offlineFactoriesCount = 0;
-
-    private final long monitoringInterval;
-
-    private final TimeUnit monitoringIntervalTimeUnit;
-
-    // Guarded by stateLock.
-    private ScheduledFuture<?> monitoringFuture;
-
     /**
-     * Creates a new abstract load balancing algorithm which will monitor
-     * offline connection factories every second using the default scheduler.
-     *
-     * @param factories
-     *            The connection factories.
+     * Guarded by stateLock.
      */
+    private int offlineFactoriesCount = 0;
+    private final long monitoringInterval;
+    private final TimeUnit monitoringIntervalTimeUnit;
+    /**
+     * Guarded by stateLock.
+     */
+    private ScheduledFuture<?> monitoringFuture;
+    private AtomicBoolean isClosed = new AtomicBoolean();
+
     AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) {
-        this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
+        this(factories, 1, TimeUnit.SECONDS, null);
     }
 
-    /**
-     * Creates a new abstract load balancing algorithm which will monitor
-     * offline connection factories using the specified frequency using the
-     * default scheduler.
-     *
-     * @param factories
-     *            The connection factories.
-     * @param interval
-     *            The interval between attempts to poll offline factories.
-     * @param unit
-     *            The time unit for the interval between attempts to poll
-     *            offline factories.
-     */
     AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
             final long interval, final TimeUnit unit) {
-        this(factories, interval, unit, StaticUtils.getDefaultScheduler());
+        this(factories, interval, unit, null);
     }
 
-    /**
-     * Creates a new abstract load balancing algorithm which will monitor
-     * offline connection factories using the specified frequency and scheduler.
-     *
-     * @param factories
-     *            The connection factories.
-     * @param interval
-     *            The interval between attempts to poll offline factories.
-     * @param unit
-     *            The time unit for the interval between attempts to poll
-     *            offline factories.
-     * @param scheduler
-     *            The scheduler which should for periodically monitoring dead
-     *            connection factories to see if they are usable again.
-     */
     AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
             final long interval, final TimeUnit unit, final ScheduledExecutorService scheduler) {
-        Validator.ensureNotNull(factories, scheduler, unit);
+        Validator.ensureNotNull(factories, unit);
 
         this.monitoredFactories = new ArrayList<MonitoredConnectionFactory>(factories.size());
         int i = 0;
         for (final ConnectionFactory f : factories) {
             this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++));
         }
-        this.scheduler = scheduler;
+        this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
         this.monitoringInterval = interval;
         this.monitoringIntervalTimeUnit = unit;
     }
 
-    /**
-     * {@inheritDoc}
-     */
+    @Override
+    public void close() {
+        if (isClosed.compareAndSet(false, true)) {
+            synchronized (stateLock) {
+                if (monitoringFuture != null) {
+                    monitoringFuture.cancel(false);
+                    monitoringFuture = null;
+                }
+            }
+            for (ConnectionFactory factory : monitoredFactories) {
+                factory.close();
+            }
+            scheduler.release();
+        }
+    }
+
     @Override
     public final ConnectionFactory getConnectionFactory() throws ErrorResultException {
         final int index = getInitialConnectionFactoryIndex();
         return getMonitoredConnectionFactory(index);
     }
 
-    /**
-     * {@inheritDoc}
-     */
     @Override
     public String toString() {
         final StringBuilder builder = new StringBuilder();
@@ -357,9 +329,11 @@
             index = (index + 1) % maxIndex;
         } while (index != initialIndex);
 
-        // All factories are offline so give up. We could have a
-        // configurable policy here such as waiting indefinitely, or for a
-        // configurable timeout period.
+        /*
+         * All factories are offline so give up. We could have a configurable
+         * policy here such as waiting indefinitely, or for a configurable
+         * timeout period.
+         */
         throw newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR,
                 "No operational connection factories available");
     }

--
Gitblit v1.10.0