mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
19.05.2012 07713219e3a14e99063afaab6cfb726dfb17f318
Minor simplification for OPENDJ-660: HeartbeatConnectionFactory should avoid doing heart-beats and Bind/StartTLS operations concurrently

Pending requests can be managed using a CLQ now that a custom synchronizer is used (early prototype used a Semaphore which required additional synchronization for managing pending requests).
1 files modified
61 ■■■■ changed files
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java 61 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -34,6 +34,8 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -122,8 +124,7 @@
        // List of pending Bind or StartTLS requests which must be invoked
        // when the current heart beat completes.
        private List<Runnable> pendingRequests = null;
        private final Object pendingRequestsLock = new Object();
        private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>();
        // Coordinates heart-beats with Bind and StartTLS requests.
        private final Sync sync = new Sync();
@@ -214,7 +215,9 @@
                                        timestamper(this, true));
                            }
                        };
                addPendingRequest(future);
                // Enqueue and flush if the heart beat has completed in the mean time.
                pendingRequests.offer(future);
                flushPendingRequests();
                return future;
            }
        }
@@ -347,7 +350,9 @@
                                    intermediateResponseHandler, timestamper(this, true));
                        }
                    };
                    addPendingRequest(future);
                    // Enqueue and flush if the heart beat has completed in the mean time.
                    pendingRequests.offer(future);
                    flushPendingRequests();
                    return future;
                }
            } else {
@@ -589,27 +594,14 @@
            }
        }
        private void addPendingRequest(final DelayedFuture<? extends Result> runner) {
            List<Runnable> tmp = null;
            synchronized (pendingRequestsLock) {
                if (pendingRequests == null) {
                    pendingRequests = new LinkedList<Runnable>();
                }
                pendingRequests.add(runner);
                // The heart beat may have completed in which case we must try
                // to invoke the pending request(s) now so that they are not left
                // stranded. Keep the lock until the requests have been dispatched
                // to avoid becoming blocked during the dispatch when the runner
                // attempts to acquire the shared lock.
        private void flushPendingRequests() {
            if (!pendingRequests.isEmpty()) {
                // The pending requests will acquire the shared lock, but we take
                // it here anyway to ensure that pending requests do not get blocked.
                if (sync.tryLockShared()) {
                    tmp = pendingRequests;
                    pendingRequests = null;
                }
            }
            if (tmp != null) {
                try {
                    for (final Runnable pendingRequest : tmp) {
                        Runnable pendingRequest;
                        while ((pendingRequest = pendingRequests.poll()) != null) {
                        pendingRequest.run();
                    }
                } finally {
@@ -617,6 +609,7 @@
                }
            }
        }
        }
        private void notifyClosed() {
            synchronized (activeConnections) {
@@ -635,27 +628,7 @@
        private void releaseHeartBeatLock() {
            sync.unlockExclusively();
            List<Runnable> tmp = null;
            synchronized (pendingRequestsLock) {
                if (pendingRequests != null) {
                    // Invoke any pending request(s). Keep the lock until the requests
                    // have been dispatched to avoid becoming blocked during the dispatch
                    // when the runner attempts to acquire the shared lock.
                    if (sync.tryLockShared()) {
                        tmp = pendingRequests;
                        pendingRequests = null;
                    }
                }
            }
            if (tmp != null) {
                try {
                    for (final Runnable pendingRequest : tmp) {
                        pendingRequest.run();
                    }
                } finally {
                    sync.unlockShared();
                }
            }
            flushPendingRequests();
        }
        private void sendHeartBeat() {