From cd55d08a6829f05a8a42fb0ca625615aa5be81f2 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 27 Mar 2013 12:12:50 +0000
Subject: [PATCH] OPENDJ-832 Leverage the work queue for processing requests received on the HTTP connection handler

---
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java |    7 ++-
 opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/RecursiveFutureResult.java          |   13 ++++++
 opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java           |   15 +++----
 opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java   |   12 ++++--
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java            |   13 +++---
 opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/AsynchronousFutureResult.java       |   25 +++++++++---
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java     |   10 +++--
 7 files changed, 63 insertions(+), 32 deletions(-)

diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
index 294e9ee..4aa9c0f 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
@@ -44,8 +44,10 @@
  * @param <S>
  *            The type of result returned by this future.
  */
-abstract class AbstractLDAPFutureResultImpl<S extends Result> extends AsynchronousFutureResult<S>
+abstract class AbstractLDAPFutureResultImpl<S extends Result>
+        extends AsynchronousFutureResult<S, ResultHandler<? super S>>
         implements IntermediateResponseHandler {
+
     private final Connection connection;
 
     private final int requestID;
@@ -54,9 +56,10 @@
 
     private volatile long timestamp;
 
-    AbstractLDAPFutureResultImpl(final int requestID, final ResultHandler<? super S> resultHandler,
-            final IntermediateResponseHandler intermediateResponseHandler,
-            final Connection connection) {
+    AbstractLDAPFutureResultImpl(final int requestID,
+        final ResultHandler<? super S> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler,
+        final Connection connection) {
         super(resultHandler);
         this.requestID = requestID;
         this.connection = connection;
@@ -72,6 +75,7 @@
         return requestID;
     }
 
+    /** {@inheritDoc} */
     @Override
     public final boolean handleIntermediateResponse(final IntermediateResponse response) {
         // FIXME: there's a potential race condition here - the future could
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/AsynchronousFutureResult.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/AsynchronousFutureResult.java
index 370a27d..a47d440 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/AsynchronousFutureResult.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/AsynchronousFutureResult.java
@@ -26,7 +26,7 @@
 
 package com.forgerock.opendj.util;
 
-import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+import static org.forgerock.opendj.ldap.ErrorResultException.*;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -63,9 +63,13 @@
  * </ul>
  *
  * @param <M>
- *            The type of result returned by this completion future.
+ *          The type of result returned by this future.
+ * @param <H>
+ *          The type of {@link ResultHandler} associated to this future.
  */
-public class AsynchronousFutureResult<M> implements FutureResult<M>, ResultHandler<M> {
+public class AsynchronousFutureResult<M, H extends ResultHandler<? super M>> implements
+    FutureResult<M>, ResultHandler<M> {
+
     @SuppressWarnings("serial")
     private final class Sync extends AbstractQueuedSynchronizer {
         // State value representing the initial state before a result has
@@ -212,7 +216,7 @@
 
     private final Sync sync = new Sync();
 
-    private final ResultHandler<? super M> handler;
+    private final H handler;
 
     private final int requestID;
 
@@ -224,7 +228,7 @@
      *            A result handler which will be forwarded the result or error
      *            when it arrives, may be {@code null}.
      */
-    public AsynchronousFutureResult(final ResultHandler<? super M> handler) {
+    public AsynchronousFutureResult(final H handler) {
         this(handler, -1);
     }
 
@@ -239,7 +243,7 @@
      *            The request ID which will be returned by the default
      *            implementation of {@link #getRequestID}.
      */
-    public AsynchronousFutureResult(final ResultHandler<? super M> handler, final int requestID) {
+    public AsynchronousFutureResult(final H handler, final int requestID) {
         this.handler = handler;
         this.requestID = requestID;
     }
@@ -270,6 +274,15 @@
     }
 
     /**
+     * Returns the request handler associated to this FutureResult.
+     *
+     * @return the request handler associated to this FutureResult.
+     */
+    public H getRequestHandler() {
+        return handler;
+    }
+
+    /**
      * {@inheritDoc}
      * <p>
      * The default implementation returns the request ID passed in during
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/RecursiveFutureResult.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/RecursiveFutureResult.java
index db39636..9aa3085 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/RecursiveFutureResult.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/RecursiveFutureResult.java
@@ -47,11 +47,14 @@
  *            The type of the outer result.
  */
 public abstract class RecursiveFutureResult<M, N> implements FutureResult<N>, ResultHandler<M> {
-    private final class FutureResultImpl extends AsynchronousFutureResult<N> {
+
+    private final class FutureResultImpl extends AsynchronousFutureResult<N, ResultHandler<? super N>> {
+
         private FutureResultImpl(final ResultHandler<? super N> handler) {
             super(handler);
         }
 
+        @Override
         public int getRequestID() {
             if (innerFuture instanceof FutureResult<?>) {
                 final FutureResult<?> tmp = (FutureResult<?>) innerFuture;
@@ -97,6 +100,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public final boolean cancel(final boolean mayInterruptIfRunning) {
         return impl.cancel(mayInterruptIfRunning);
     }
@@ -104,6 +108,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public final N get() throws ErrorResultException, InterruptedException {
         return impl.get();
     }
@@ -111,6 +116,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public final N get(final long timeout, final TimeUnit unit) throws ErrorResultException,
             TimeoutException, InterruptedException {
         return impl.get(timeout, unit);
@@ -119,6 +125,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public final int getRequestID() {
         return impl.getRequestID();
     }
@@ -126,6 +133,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public final void handleErrorResult(final ErrorResultException error) {
         try {
             outerFuture = chainErrorResult(error, impl);
@@ -137,6 +145,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public final void handleResult(final M result) {
         try {
             outerFuture = chainResult(result, impl);
@@ -148,6 +157,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public final boolean isCancelled() {
         return impl.isCancelled();
     }
@@ -155,6 +165,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public final boolean isDone() {
         return impl.isDone();
     }
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
index 3951e19..aead5f3 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -27,7 +27,7 @@
 
 package org.forgerock.opendj.ldap;
 
-import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+import static org.forgerock.opendj.ldap.ErrorResultException.*;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -70,6 +70,7 @@
         /**
          * {@inheritDoc}
          */
+        @Override
         public Connection getConnection() throws ErrorResultException {
             final Connection connection;
             try {
@@ -92,8 +93,8 @@
         @Override
         public FutureResult<Connection> getConnectionAsync(
                 final ResultHandler<? super Connection> resultHandler) {
-            final AsynchronousFutureResult<Connection> future =
-                    new AsynchronousFutureResult<Connection>(resultHandler);
+            final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future =
+                   new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(resultHandler);
 
             final ResultHandler<Connection> failoverHandler = new ResultHandler<Connection>() {
                 @Override
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
index df6795b..bc6cc11 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -27,9 +27,10 @@
 
 package org.forgerock.opendj.ldap;
 
-import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
-import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
-import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+import static com.forgerock.opendj.util.StaticUtils.*;
+
+import static org.forgerock.opendj.ldap.CoreMessages.*;
+import static org.forgerock.opendj.ldap.ErrorResultException.*;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -536,7 +537,7 @@
         }
 
         QueueElement(final ResultHandler<? super Connection> handler) {
-            this.value = new AsynchronousFutureResult<Connection>(handler);
+            this.value = new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(handler);
         }
 
         @Override
@@ -553,8 +554,8 @@
         }
 
         @SuppressWarnings("unchecked")
-        AsynchronousFutureResult<Connection> getWaitingFuture() {
-            return (AsynchronousFutureResult<Connection>) value;
+        AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> getWaitingFuture() {
+            return (AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>) value;
         }
 
         boolean isWaitingFuture() {
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 3e57b0b..10e018f 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -27,9 +27,10 @@
 
 package org.forgerock.opendj.ldap;
 
-import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
-import static java.lang.System.currentTimeMillis;
-import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+import static com.forgerock.opendj.util.StaticUtils.*;
+import static java.lang.System.*;
+
+import static org.forgerock.opendj.ldap.ErrorResultException.*;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -85,7 +86,8 @@
          * @param <R>
          *            The type of result returned by the request.
          */
-        private abstract class DelayedFuture<R extends Result> extends AsynchronousFutureResult<R>
+        private abstract class DelayedFuture<R extends Result>
+                extends AsynchronousFutureResult<R, ResultHandler<? super R>>
                 implements Runnable {
             private volatile FutureResult<R> innerFuture = null;
 
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
index 0fbe857..7ffd37e 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
@@ -24,12 +24,11 @@
  *      Copyright 2010 Sun Microsystems, Inc.
  *      Portions copyright 2011-2012 ForgeRock AS.
  */
-
 package org.forgerock.opendj.ldap;
 
-import static org.fest.assertions.Assertions.assertThat;
-import static org.fest.assertions.Fail.fail;
-import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
+import static org.fest.assertions.Assertions.*;
+import static org.fest.assertions.Fail.*;
+import static org.forgerock.opendj.ldap.TestCaseUtils.*;
 
 import java.net.InetSocketAddress;
 import java.util.Arrays;
@@ -65,10 +64,10 @@
 public class LDAPListenerTestCase extends SdkTestCase {
 
     private static class MockServerConnection implements ServerConnection<Integer> {
-        final AsynchronousFutureResult<Throwable> connectionError =
-                new AsynchronousFutureResult<Throwable>(null);
-        final AsynchronousFutureResult<LDAPClientContext> context =
-                new AsynchronousFutureResult<LDAPClientContext>(null);
+        final AsynchronousFutureResult<Throwable, ResultHandler<? super Throwable>> connectionError =
+                new AsynchronousFutureResult<Throwable, ResultHandler<? super Throwable>>(null);
+        final AsynchronousFutureResult<LDAPClientContext, ResultHandler<? super LDAPClientContext>> context =
+                new AsynchronousFutureResult<LDAPClientContext, ResultHandler<? super LDAPClientContext>>(null);
         final CountDownLatch isClosed = new CountDownLatch(1);
 
         MockServerConnection() {

--
Gitblit v1.10.0