| | |
| | | import java.util.Collection; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.Queue; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.ScheduledFuture; |
| | | import java.util.concurrent.TimeUnit; |
| | |
| | | |
| | | // List of pending Bind or StartTLS requests which must be invoked |
| | | // when the current heart beat completes. |
| | | private List<Runnable> pendingRequests = null; |
| | | private final Object pendingRequestsLock = new Object(); |
| | | private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>(); |
| | | |
| | | // Coordinates heart-beats with Bind and StartTLS requests. |
| | | private final Sync sync = new Sync(); |
| | |
| | | timestamper(this, true)); |
| | | } |
| | | }; |
| | | addPendingRequest(future); |
| | | // Enqueue and flush if the heart beat has completed in the mean time. |
| | | pendingRequests.offer(future); |
| | | flushPendingRequests(); |
| | | return future; |
| | | } |
| | | } |
| | |
| | | intermediateResponseHandler, timestamper(this, true)); |
| | | } |
| | | }; |
| | | addPendingRequest(future); |
| | | // Enqueue and flush if the heart beat has completed in the mean time. |
| | | pendingRequests.offer(future); |
| | | flushPendingRequests(); |
| | | return future; |
| | | } |
| | | } else { |
| | |
| | | } |
| | | } |
| | | |
| | | private void addPendingRequest(final DelayedFuture<? extends Result> runner) { |
| | | List<Runnable> tmp = null; |
| | | synchronized (pendingRequestsLock) { |
| | | if (pendingRequests == null) { |
| | | pendingRequests = new LinkedList<Runnable>(); |
| | | } |
| | | pendingRequests.add(runner); |
| | | |
| | | // The heart beat may have completed in which case we must try |
| | | // to invoke the pending request(s) now so that they are not left |
| | | // stranded. Keep the lock until the requests have been dispatched |
| | | // to avoid becoming blocked during the dispatch when the runner |
| | | // attempts to acquire the shared lock. |
| | | private void flushPendingRequests() { |
| | | if (!pendingRequests.isEmpty()) { |
| | | // The pending requests will acquire the shared lock, but we take |
| | | // it here anyway to ensure that pending requests do not get blocked. |
| | | if (sync.tryLockShared()) { |
| | | tmp = pendingRequests; |
| | | pendingRequests = null; |
| | | } |
| | | } |
| | | if (tmp != null) { |
| | | try { |
| | | for (final Runnable pendingRequest : tmp) { |
| | | Runnable pendingRequest; |
| | | while ((pendingRequest = pendingRequests.poll()) != null) { |
| | | pendingRequest.run(); |
| | | } |
| | | } finally { |
| | |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void notifyClosed() { |
| | | synchronized (activeConnections) { |
| | |
| | | |
| | | private void releaseHeartBeatLock() { |
| | | sync.unlockExclusively(); |
| | | List<Runnable> tmp = null; |
| | | synchronized (pendingRequestsLock) { |
| | | if (pendingRequests != null) { |
| | | // Invoke any pending request(s). Keep the lock until the requests |
| | | // have been dispatched to avoid becoming blocked during the dispatch |
| | | // when the runner attempts to acquire the shared lock. |
| | | if (sync.tryLockShared()) { |
| | | tmp = pendingRequests; |
| | | pendingRequests = null; |
| | | } |
| | | } |
| | | } |
| | | if (tmp != null) { |
| | | try { |
| | | for (final Runnable pendingRequest : tmp) { |
| | | pendingRequest.run(); |
| | | } |
| | | } finally { |
| | | sync.unlockShared(); |
| | | } |
| | | } |
| | | flushPendingRequests(); |
| | | } |
| | | |
| | | private void sendHeartBeat() { |