From 3f7ddbf313aaabbfba4650cb2036cb41e51a9bde Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 18 Apr 2013 11:37:28 +0000
Subject: [PATCH] Fix OPENDJ-838: Add ConnectionFactory.close() method to facilitate resource cleanup after application exit

---
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java |  201 ++++++++++++++++++++++++--------------------------
 1 files changed, 97 insertions(+), 104 deletions(-)

diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 9f73c51..669195f 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2012 ForgeRock AS.
+ *      Portions copyright 2011-2013 ForgeRock AS.
  */
 
 package org.forgerock.opendj.ldap;
@@ -40,6 +40,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import java.util.logging.Level;
 
@@ -64,7 +65,7 @@
 
 import com.forgerock.opendj.util.AsynchronousFutureResult;
 import com.forgerock.opendj.util.FutureResultTransformer;
-import com.forgerock.opendj.util.StaticUtils;
+import com.forgerock.opendj.util.ReferenceCountedObject;
 import com.forgerock.opendj.util.Validator;
 
 /**
@@ -75,7 +76,7 @@
     /**
      * A connection that sends heart beats and supports all operations.
      */
-    private final class ConnectionImpl extends AbstractConnectionWrapper implements
+    private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements
             ConnectionEventListener, SearchResultHandler {
 
         /**
@@ -85,9 +86,8 @@
          * @param <R>
          *            The type of result returned by the request.
          */
-        private abstract class DelayedFuture<R extends Result>
-                extends AsynchronousFutureResult<R, ResultHandler<? super R>>
-                implements Runnable {
+        private abstract class DelayedFuture<R extends Result> extends
+                AsynchronousFutureResult<R, ResultHandler<? super R>> implements Runnable {
             private volatile FutureResult<R> innerFuture = null;
 
             protected DelayedFuture(final ResultHandler<? super R> handler) {
@@ -123,14 +123,19 @@
 
         }
 
-        // List of pending Bind or StartTLS requests which must be invoked
-        // when the current heart beat completes.
+        /*
+         * List of pending Bind or StartTLS requests which must be invoked when
+         * the current heart beat completes.
+         */
         private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>();
 
-        // Coordinates heart-beats with Bind and StartTLS requests.
+        /* Coordinates heart-beats with Bind and StartTLS requests. */
         private final Sync sync = new Sync();
 
-        // Timestamp of last response received (any response, not just heart beats).
+        /*
+         * Timestamp of last response received (any response, not just heart
+         * beats).
+         */
         private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
 
         private ConnectionImpl(final Connection connection) {
@@ -206,8 +211,10 @@
                 return connection.bindAsync(request, intermediateResponseHandler, timestamper(
                         resultHandler, true));
             } else {
-                // A heart beat must be in progress so create a runnable task
-                // which will be executed when the heart beat completes.
+                /*
+                 * A heart beat must be in progress so create a runnable task
+                 * which will be executed when the heart beat completes.
+                 */
                 final DelayedFuture<BindResult> future =
                         new DelayedFuture<BindResult>(resultHandler) {
                             @Override
@@ -216,7 +223,10 @@
                                         timestamper(this, true));
                             }
                         };
-                // Enqueue and flush if the heart beat has completed in the mean time.
+                /*
+                 * Enqueue and flush if the heart beat has completed in the mean
+                 * time.
+                 */
                 pendingRequests.offer(future);
                 flushPendingRequests();
                 return future;
@@ -342,8 +352,11 @@
                     return connection.extendedRequestAsync(request, intermediateResponseHandler,
                             timestamper(resultHandler, true));
                 } else {
-                    // A heart beat must be in progress so create a runnable task
-                    // which will be executed when the heart beat completes.
+                    /*
+                     * A heart beat must be in progress so create a runnable
+                     * task which will be executed when the heart beat
+                     * completes.
+                     */
                     final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
                         @Override
                         public FutureResult<R> dispatch() {
@@ -351,7 +364,11 @@
                                     intermediateResponseHandler, timestamper(this, true));
                         }
                     };
-                    // Enqueue and flush if the heart beat has completed in the mean time.
+
+                    /*
+                     * Enqueue and flush if the heart beat has completed in the
+                     * mean time.
+                     */
                     pendingRequests.offer(future);
                     flushPendingRequests();
                     return future;
@@ -382,7 +399,7 @@
         @Override
         public void handleErrorResult(final ErrorResultException error) {
             if (DEBUG_LOG.isLoggable(Level.FINE)) {
-                DEBUG_LOG.fine(String.format("Heartbeat failed: " + error.getMessage()));
+                DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage()));
             }
             updateTimestamp();
             releaseHeartBeatLock();
@@ -582,8 +599,10 @@
         }
 
         private void acquireBindOrStartTLSLock() throws ErrorResultException {
-            // Wait for pending heartbeats and prevent new heartbeats from
-            // being sent while the bind is in progress.
+            /*
+             * Wait for pending heartbeats and prevent new heartbeats from being
+             * sent while the bind is in progress.
+             */
             try {
                 if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
                     // Give up - it looks like the connection is dead.
@@ -597,8 +616,11 @@
 
         private void flushPendingRequests() {
             if (!pendingRequests.isEmpty()) {
-                // The pending requests will acquire the shared lock, but we take
-                // it here anyway to ensure that pending requests do not get blocked.
+                /*
+                 * The pending requests will acquire the shared lock, but we
+                 * take it here anyway to ensure that pending requests do not
+                 * get blocked.
+                 */
                 if (sync.tryLockShared()) {
                     try {
                         Runnable pendingRequest;
@@ -617,7 +639,10 @@
                 connection.removeConnectionEventListener(this);
                 activeConnections.remove(this);
                 if (activeConnections.isEmpty()) {
-                    // This is the last active connection, so stop the heartbeat.
+                    /*
+                     * This is the last active connection, so stop the
+                     * heartbeat.
+                     */
                     heartBeatFuture.cancel(false);
                 }
             }
@@ -633,22 +658,33 @@
         }
 
         private void sendHeartBeat() {
-            // Only send the heartbeat if the connection has been idle for some time.
+            /*
+             * Only send the heartbeat if the connection has been idle for some
+             * time.
+             */
             if (currentTimeMillis() < (timestamp + minDelayMS)) {
                 return;
             }
 
-            // Don't send a heart beat if there is already a heart beat,
-            // bind, or startTLS in progress. Note that the bind/startTLS
-            // response will update the timestamp as if it were a heart beat.
+            /*
+             * Don't send a heart beat if there is already a heart beat, bind,
+             * or startTLS in progress. Note that the bind/startTLS response
+             * will update the timestamp as if it were a heart beat.
+             */
             if (sync.tryLockExclusively()) {
                 try {
                     connection.searchAsync(heartBeatRequest, null, this);
                 } catch (final Exception e) {
-                    // This may happen when we attempt to send the heart beat just
-                    // after the connection is closed but before we are notified.
+                    /*
+                     * This may happen when we attempt to send the heart beat
+                     * just after the connection is closed but before we are
+                     * notified.
+                     */
 
-                    // Release the lock because we're never going to get a response.
+                    /*
+                     * Release the lock because we're never going to get a
+                     * response.
+                     */
                     releaseHeartBeatLock();
                 }
             }
@@ -755,7 +791,7 @@
      * </ul>
      */
     private static final class Sync extends AbstractQueuedSynchronizer {
-        // Lock states. Positive values indicate that the shared lock is taken.
+        /* Lock states. Positive values indicate that the shared lock is taken. */
         private static final int UNLOCKED = 0; // initial state
         private static final int LOCKED_EXCLUSIVELY = -1;
 
@@ -809,8 +845,10 @@
                 }
                 final int newState = state - 1;
                 if (compareAndSetState(state, newState)) {
-                    // We could always return true here, but since there cannot
-                    // be waiting readers we can specialize for waiting writers.
+                    /*
+                     * We could always return true here, but since there cannot
+                     * be waiting readers we can specialize for waiting writers.
+                     */
                     return newState == UNLOCKED;
                 }
             }
@@ -851,83 +889,29 @@
     private final SearchRequest heartBeatRequest;
     private final long interval;
     private final long minDelayMS;
-    private final ScheduledExecutorService scheduler;
+    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
     private final long timeoutMS;
     private final TimeUnit unit;
+    private AtomicBoolean isClosed = new AtomicBoolean();
 
-    /**
-     * Creates a new heart-beat connection factory which will create connections
-     * using the provided connection factory and periodically ping any created
-     * connections in order to detect that they are still alive every 10 seconds
-     * using the default scheduler.
-     *
-     * @param factory
-     *            The connection factory to use for creating connections.
-     */
     HeartBeatConnectionFactory(final ConnectionFactory factory) {
-        this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler());
+        this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null);
     }
 
-    /**
-     * Creates a new heart-beat connection factory which will create connections
-     * using the provided connection factory and periodically ping any created
-     * connections in order to detect that they are still alive using the
-     * specified frequency and the default scheduler.
-     *
-     * @param factory
-     *            The connection factory to use for creating connections.
-     * @param interval
-     *            The interval between keepalive pings.
-     * @param unit
-     *            The time unit for the interval between keepalive pings.
-     */
     HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
             final TimeUnit unit) {
-        this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler());
+        this(factory, interval, unit, DEFAULT_SEARCH, null);
     }
 
-    /**
-     * Creates a new heart-beat connection factory which will create connections
-     * using the provided connection factory and periodically ping any created
-     * connections using the specified search request in order to detect that
-     * they are still alive.
-     *
-     * @param factory
-     *            The connection factory to use for creating connections.
-     * @param interval
-     *            The interval between keepalive pings.
-     * @param unit
-     *            The time unit for the interval between keepalive pings.
-     * @param heartBeat
-     *            The search request to use for keepalive pings.
-     */
     HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
             final TimeUnit unit, final SearchRequest heartBeat) {
-        this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler());
+        this(factory, interval, unit, heartBeat, null);
     }
 
-    /**
-     * Creates a new heart-beat connection factory which will create connections
-     * using the provided connection factory and periodically ping any created
-     * connections using the specified search request in order to detect that
-     * they are still alive.
-     *
-     * @param factory
-     *            The connection factory to use for creating connections.
-     * @param interval
-     *            The interval between keepalive pings.
-     * @param unit
-     *            The time unit for the interval between keepalive pings.
-     * @param heartBeat
-     *            The search request to use for keepalive pings.
-     * @param scheduler
-     *            The scheduler which should for periodically sending keepalive
-     *            pings.
-     */
     HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
             final TimeUnit unit, final SearchRequest heartBeat,
             final ScheduledExecutorService scheduler) {
-        Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
+        Validator.ensureNotNull(factory, heartBeat, unit);
         Validator.ensureTrue(interval >= 0, "negative timeout");
 
         this.heartBeatRequest = heartBeat;
@@ -935,22 +919,34 @@
         this.unit = unit;
         this.activeConnections = new LinkedList<ConnectionImpl>();
         this.factory = factory;
-        this.scheduler = scheduler;
+        this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
         this.timeoutMS = unit.toMillis(interval) * 2;
         this.minDelayMS = unit.toMillis(interval) / 2;
     }
 
-    /**
-     * {@inheritDoc}
-     */
+    @Override
+    public void close() {
+        if (isClosed.compareAndSet(false, true)) {
+            synchronized (activeConnections) {
+                if (!activeConnections.isEmpty()) {
+                    if (DEBUG_LOG.isLoggable(Level.FINE)) {
+                        DEBUG_LOG.fine(String.format(
+                                "HeartbeatConnectionFactory '%s' is closing while %d "
+                                        + "active connections remain", toString(),
+                                activeConnections.size()));
+                    }
+                }
+            }
+            scheduler.release();
+            factory.close();
+        }
+    }
+
     @Override
     public Connection getConnection() throws ErrorResultException {
         return adaptConnection(factory.getConnection());
     }
 
-    /**
-     * {@inheritDoc}
-     */
     @Override
     public FutureResult<Connection> getConnectionAsync(
             final ResultHandler<? super Connection> handler) {
@@ -967,9 +963,6 @@
         return future;
     }
 
-    /**
-     * {@inheritDoc}
-     */
     @Override
     public String toString() {
         final StringBuilder builder = new StringBuilder();
@@ -984,8 +977,8 @@
         synchronized (activeConnections) {
             connection.addConnectionEventListener(heartBeatConnection);
             if (activeConnections.isEmpty()) {
-                // This is the first active connection, so start the heart beat.
-                heartBeatFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
+                /* This is the first active connection, so start the heart beat. */
+                heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() {
                     @Override
                     public void run() {
                         final ConnectionImpl[] tmp;

--
Gitblit v1.10.0