From 5eaa5880d0d0fbc1290e66c1173e60f7da7b446c Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 28 Mar 2013 16:32:52 +0000
Subject: [PATCH] Additional change for OPENDJ-354: Implement a RequestHandler which provides an in-memory backend

---
 opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java |  206 ++++++++++++++++++++--------------
 opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java                  |  118 +++++++++++++++++--
 opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java        |   10 -
 3 files changed, 226 insertions(+), 108 deletions(-)

diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
index 6ab7125..92c7641 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -27,12 +27,15 @@
 
 package org.forgerock.opendj.ldap;
 
+import static org.forgerock.opendj.ldap.RequestHandlerFactoryAdapter.adaptRequestHandler;
+
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.forgerock.opendj.ldap.requests.BindRequest;
 import org.forgerock.opendj.ldap.requests.SearchRequest;
 
+import com.forgerock.opendj.ldap.InternalConnection;
 import com.forgerock.opendj.util.Validator;
 
 /**
@@ -195,6 +198,94 @@
     }
 
     /**
+     * Creates a new internal client connection which will route requests to the
+     * provided {@code RequestHandler}.
+     * <p>
+     * When processing requests, {@code RequestHandler} implementations are
+     * passed a {@code RequestContext} having a pseudo {@code requestID} which
+     * is incremented for each successive internal request on a per client
+     * connection basis. The request ID may be useful for logging purposes.
+     * <p>
+     * An internal connection does not require {@code RequestHandler}
+     * implementations to return a result when processing requests. However, it
+     * is recommended that implementations do always return results even for
+     * abandoned requests. This is because application client threads may block
+     * indefinitely waiting for results.
+     *
+     * @param requestHandler
+     *            The request handler which will be used for all client
+     *            connections.
+     * @return The new internal connection.
+     * @throws NullPointerException
+     *             If {@code requestHandler} was {@code null}.
+     */
+    public static Connection newInternalConnection(
+            final RequestHandler<RequestContext> requestHandler) {
+        Validator.ensureNotNull(requestHandler);
+        return newInternalConnection(adaptRequestHandler(requestHandler));
+    }
+
+    /**
+     * Creates a new internal client connection which will route requests to the
+     * provided {@code ServerConnection}.
+     * <p>
+     * When processing requests, {@code ServerConnection} implementations are
+     * passed an integer as the first parameter. This integer represents a
+     * pseudo {@code requestID} which is incremented for each successive
+     * internal request on a per client connection basis. The request ID may be
+     * useful for logging purposes.
+     * <p>
+     * An internal connection does not require {@code ServerConnection}
+     * implementations to return a result when processing requests. However, it
+     * is recommended that implementations do always return results even for
+     * abandoned requests. This is because application client threads may block
+     * indefinitely waiting for results.
+     *
+     * @param serverConnection
+     *            The server connection.
+     * @return The new internal connection.
+     * @throws NullPointerException
+     *             If {@code serverConnection} was {@code null}.
+     */
+    public static Connection newInternalConnection(final ServerConnection<Integer> serverConnection) {
+        Validator.ensureNotNull(serverConnection);
+        return new InternalConnection(serverConnection);
+    }
+
+    /**
+     * Creates a new connection factory which binds internal client connections
+     * to {@link RequestHandler}s created using the provided
+     * {@link RequestHandlerFactory}.
+     * <p>
+     * When processing requests, {@code RequestHandler} implementations are
+     * passed an integer as the first parameter. This integer represents a
+     * pseudo {@code requestID} which is incremented for each successive
+     * internal request on a per client connection basis. The request ID may be
+     * useful for logging purposes.
+     * <p>
+     * An internal connection factory does not require {@code RequestHandler}
+     * implementations to return a result when processing requests. However, it
+     * is recommended that implementations do always return results even for
+     * abandoned requests. This is because application client threads may block
+     * indefinitely waiting for results.
+     *
+     * @param <C>
+     *            The type of client context.
+     * @param factory
+     *            The request handler factory to use for creating connections.
+     * @param clientContext
+     *            The client context.
+     * @return The new internal connection factory.
+     * @throws NullPointerException
+     *             If {@code factory} was {@code null}.
+     */
+    public static <C> ConnectionFactory newInternalConnectionFactory(
+            final RequestHandlerFactory<C, RequestContext> factory, final C clientContext) {
+        Validator.ensureNotNull(factory);
+        return new InternalConnectionFactory<C>(newServerConnectionFactory(factory), clientContext);
+    }
+
+    /**
      * Creates a new connection factory which binds internal client connections
      * to {@link ServerConnection}s created using the provided
      * {@link ServerConnectionFactory}.
@@ -265,14 +356,14 @@
         return new ConnectionFactory() {
 
             @Override
-            public FutureResult<Connection> getConnectionAsync(
-                    final ResultHandler<? super Connection> handler) {
-                return factory.getConnectionAsync(handler);
+            public Connection getConnection() throws ErrorResultException {
+                return factory.getConnection();
             }
 
             @Override
-            public Connection getConnection() throws ErrorResultException {
-                return factory.getConnection();
+            public FutureResult<Connection> getConnectionAsync(
+                    final ResultHandler<? super Connection> handler) {
+                return factory.getConnectionAsync(handler);
             }
 
             /**
@@ -313,17 +404,12 @@
     public static <C> ServerConnectionFactory<C, Integer> newServerConnectionFactory(
             final RequestHandler<RequestContext> requestHandler) {
         Validator.ensureNotNull(requestHandler);
-
-        final RequestHandlerFactory<C, RequestContext> factory =
-                new RequestHandlerFactory<C, RequestContext>() {
-
-                    public RequestHandler<RequestContext> handleAccept(C clientContext)
-                            throws ErrorResultException {
-                        return requestHandler;
-                    }
-                };
-
-        return new RequestHandlerFactoryAdapter<C>(factory);
+        return new RequestHandlerFactoryAdapter<C>(new RequestHandlerFactory<C, RequestContext>() {
+            @Override
+            public RequestHandler<RequestContext> handleAccept(final C clientContext) {
+                return requestHandler;
+            }
+        });
     }
 
     /**
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
index 2f4ee76..191d943 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
@@ -21,12 +21,18 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2011-2012 ForgeRock AS
+ *      Copyright 2011-2013 ForgeRock AS
  */
 
 package org.forgerock.opendj.ldap;
 
-import static org.forgerock.opendj.ldap.CoreMessages.*;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_ABANDON_REQUEST;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CANCEL_REQUEST;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CLIENT_DISCONNECT;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CLIENT_ERROR;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_SERVER_DISCONNECT;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CLIENT_CONNECTION_CLOSING;
+import static org.forgerock.opendj.ldap.CoreMessages.WARN_CLIENT_DUPLICATE_MESSAGE_ID;
 import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
 
 import java.util.Iterator;
@@ -72,9 +78,10 @@
     private static class RequestContextImpl<S extends Result, H extends ResultHandler<? super S>>
             implements RequestContext, ResultHandler<S> {
 
-        // Adapter class which invokes cancel result handlers with correct
-        // result
-        // type.
+        /*
+         * Adapter class which invokes cancel result handlers with correct
+         * result type.
+         */
         private static final class ExtendedResultHandlerHolder<R extends ExtendedResult> {
             private final ExtendedRequest<R> request;
             private final ResultHandler<? super R> resultHandler;
@@ -101,46 +108,46 @@
         }
 
         private static enum RequestState {
-            // Request active
-            PENDING,
-
             // Request active, cancel requested
             CANCEL_REQUESTED,
 
-            // Request active, too late to cancel
-            TOO_LATE,
+            // Result sent, was cancelled
+            CANCELLED,
+
+            // Request active
+            PENDING,
 
             // Result sent, not cancelled
             RESULT_SENT,
 
-            // Result sent, was cancelled
-            CANCELLED;
+            // Request active, too late to cancel
+            TOO_LATE;
         }
 
-        private final int messageID;
-
-        // Cancellation state guarded by lock.
-        private final Object stateLock = new Object();
+        protected final H resultHandler;
 
         // These should be notified when a cancel request arrives, at most once.
         private List<CancelRequestListener> cancelRequestListeners = null;
 
+        private LocalizableMessage cancelRequestReason = null;
+
         // These should be notified when the result is set.
         private List<ExtendedResultHandlerHolder<?>> cancelResultHandlers = null;
 
-        private RequestState state = RequestState.PENDING;
-
-        private LocalizableMessage cancelRequestReason = null;
-
-        private boolean sendResult = true;
+        private final ServerConnectionImpl clientConnection;
 
         private final boolean isCancelSupported;
 
-        private final ServerConnectionImpl<?> clientConnection;
+        private final int messageID;
 
-        protected final H resultHandler;
+        private boolean sendResult = true;
 
-        protected RequestContextImpl(final ServerConnectionImpl<?> clientConnection,
+        private RequestState state = RequestState.PENDING;
+
+        // Cancellation state guarded by lock.
+        private final Object stateLock = new Object();
+
+        protected RequestContextImpl(final ServerConnectionImpl clientConnection,
                 final H resultHandler, final int messageID, final boolean isCancelSupported) {
             this.clientConnection = clientConnection;
             this.resultHandler = resultHandler;
@@ -171,9 +178,10 @@
                 case TOO_LATE:
                 case RESULT_SENT:
                 case CANCELLED:
-                    // No point in registering the callback since the request
-                    // can never be
-                    // cancelled now.
+                    /*
+                     * No point in registering the callback since the request
+                     * can never be cancelled now.
+                     */
                     break;
                 }
             }
@@ -191,24 +199,28 @@
             synchronized (stateLock) {
                 switch (state) {
                 case PENDING:
-                    // No cancel request, so no handlers, just switch state.
+                    /* No cancel request, so no handlers, just switch state. */
                     if (signalTooLate) {
                         cancelRequestListeners = null;
                         state = RequestState.TOO_LATE;
                     }
                     break;
                 case CANCEL_REQUESTED:
-                    // Don't change state: let the handler ack the cancellation
-                    // request.
+                    /*
+                     * Don't change state: let the handler ack the cancellation
+                     * request.
+                     */
                     throw (CancelledResultException) newErrorResult(ResultCode.CANCELLED,
                             cancelRequestReason.toString());
                 case TOO_LATE:
-                    // Already too late. Nothing to do.
+                    /* Already too late. Nothing to do. */
                     break;
                 case RESULT_SENT:
                 case CANCELLED:
-                    // This should not happen - could throw an illegal state
-                    // exception?
+                    /*
+                     * This should not happen - could throw an illegal state
+                     * exception?
+                     */
                     break;
                 }
             }
@@ -229,11 +241,12 @@
         public void handleErrorResult(final ErrorResultException error) {
             if (clientConnection.removePendingRequest(this)) {
                 if (setResult(error.getResult())) {
-                    // FIXME: we must invoke the result handler even when
-                    // abandoned so
-                    // that chained result handlers may clean up, log, etc. We
-                    // really need
-                    // to signal that the result must not be sent to the client.
+                    /*
+                     * FIXME: we must invoke the result handler even when
+                     * abandoned so that chained result handlers may clean up,
+                     * log, etc. We really need to signal that the result must
+                     * not be sent to the client.
+                     */
                 }
                 resultHandler.handleErrorResult(error);
             }
@@ -246,11 +259,12 @@
         public void handleResult(final S result) {
             if (clientConnection.removePendingRequest(this)) {
                 if (setResult(result)) {
-                    // FIXME: we must invoke the result handler even when
-                    // abandoned so
-                    // that chained result handlers may clean up, log, etc. We
-                    // really need
-                    // to signal that the result must not be sent to the client.
+                    /*
+                     * FIXME: we must invoke the result handler even when
+                     * abandoned so that chained result handlers may clean up,
+                     * log, etc. We really need to signal that the result must
+                     * not be sent to the client.
+                     */
                 }
                 resultHandler.handleResult(result);
             }
@@ -291,7 +305,7 @@
             synchronized (stateLock) {
                 switch (state) {
                 case PENDING:
-                    // Switch to CANCEL_REQUESTED state.
+                    /* Switch to CANCEL_REQUESTED state. */
                     cancelRequestReason = reason;
                     if (cancelResultHandler != null) {
                         cancelResultHandlers = new LinkedList<ExtendedResultHandlerHolder<?>>();
@@ -304,7 +318,9 @@
                     this.sendResult &= sendResult;
                     break;
                 case CANCEL_REQUESTED:
-                    // Cancel already request so listeners already invoked.
+                    /*
+                     * Cancel already request so listeners already invoked.
+                     */
                     if (cancelResultHandler != null) {
                         if (cancelResultHandlers == null) {
                             cancelResultHandlers = new LinkedList<ExtendedResultHandlerHolder<?>>();
@@ -315,20 +331,22 @@
                     break;
                 case TOO_LATE:
                 case RESULT_SENT:
-                    // Cannot cancel, so invoke result handler immediately
-                    // outside of
-                    // lock.
+                    /*
+                     * Cannot cancel, so invoke result handler immediately
+                     * outside of lock.
+                     */
                     if (cancelResultHandler != null) {
                         invokeResultHandler = true;
                         resultHandlerIsSuccess = false;
                     }
                     break;
                 case CANCELLED:
-                    // Multiple cancellation attempts. Clients should not do
-                    // this, but the
-                    // cancel will effectively succeed immediately, so invoke
-                    // result
-                    // handler immediately outside of lock.
+                    /*
+                     * Multiple cancellation attempts. Clients should not do
+                     * this, but the cancel will effectively succeed
+                     * immediately, so invoke result handler immediately outside
+                     * of lock.
+                     */
                     if (cancelResultHandler != null) {
                         invokeResultHandler = true;
                         resultHandlerIsSuccess = true;
@@ -337,7 +355,7 @@
                 }
             }
 
-            // Invoke listeners outside of lock.
+            /* Invoke listeners outside of lock. */
             if (tmpListeners != null) {
                 for (final CancelRequestListener listener : tmpListeners) {
                     listener.handleCancelRequest(reason);
@@ -376,7 +394,7 @@
                 switch (state) {
                 case PENDING:
                 case TOO_LATE:
-                    // Switch to appropriate final state.
+                    /* Switch to appropriate final state. */
                     if (!result.getResultCode().equals(ResultCode.CANCELLED)) {
                         state = RequestState.RESULT_SENT;
                     } else {
@@ -384,9 +402,10 @@
                     }
                     break;
                 case CANCEL_REQUESTED:
-                    // Switch to appropriate final state and invoke any cancel
-                    // request
-                    // handlers.
+                    /*
+                     * Switch to appropriate final state and invoke any cancel
+                     * request handlers.
+                     */
                     if (!result.getResultCode().equals(ResultCode.CANCELLED)) {
                         state = RequestState.RESULT_SENT;
                     } else {
@@ -399,14 +418,16 @@
                     break;
                 case RESULT_SENT:
                 case CANCELLED:
-                    // This should not happen - could throw an illegal state
-                    // exception?
+                    /*
+                     * This should not happen - could throw an illegal state
+                     * exception?
+                     */
                     maySendResult = false; // Prevent sending multiple results.
                     break;
                 }
             }
 
-            // Invoke handlers outside of lock.
+            /* Invoke handlers outside of lock. */
             if (tmpHandlers != null) {
                 for (final ExtendedResultHandlerHolder<?> handler : tmpHandlers) {
                     if (isCancelled) {
@@ -427,7 +448,7 @@
     private final static class SearchRequestContextImpl extends
             RequestContextImpl<Result, SearchResultHandler> implements SearchResultHandler {
 
-        private SearchRequestContextImpl(final ServerConnectionImpl<?> clientConnection,
+        private SearchRequestContextImpl(final ServerConnectionImpl clientConnection,
                 final SearchResultHandler resultHandler, final int messageID,
                 final boolean isCancelSupported) {
             super(clientConnection, resultHandler, messageID, isCancelSupported);
@@ -450,11 +471,11 @@
         }
     }
 
-    private static final class ServerConnectionImpl<C> implements ServerConnection<Integer> {
-        private final RequestHandler<RequestContext> requestHandler;
+    private static final class ServerConnectionImpl implements ServerConnection<Integer> {
         private final AtomicBoolean isClosed = new AtomicBoolean();
         private final ConcurrentHashMap<Integer, RequestContextImpl<?, ?>> pendingRequests =
                 new ConcurrentHashMap<Integer, RequestContextImpl<?, ?>>();
+        private final RequestHandler<RequestContext> requestHandler;
 
         private ServerConnectionImpl(final RequestHandler<RequestContext> requestHandler) {
             this.requestHandler = requestHandler;
@@ -588,11 +609,11 @@
                     return;
                 }
 
-                // Register the request in the pending requests table. Even
-                // though
-                // this request cannot be cancelled, it is important to do this
-                // in
-                // order to monitor the number of pending operations.
+                /*
+                 * Register the request in the pending requests table. Even
+                 * though this request cannot be cancelled, it is important to
+                 * do this in order to monitor the number of pending operations.
+                 */
                 final RequestContextImpl<R, ResultHandler<? super R>> requestContext =
                         new RequestContextImpl<R, ResultHandler<? super R>>(this, resultHandler,
                                 messageID, false);
@@ -605,9 +626,10 @@
                                 INFO_CANCELED_BY_CANCEL_REQUEST.get(messageID);
                         cancelledRequest.cancel(cancelReason, request, requestContext, true);
                     } else {
-                        // Couldn't find the request. Invoke on context in order
-                        // to remove
-                        // pending request.
+                        /*
+                         * Couldn't find the request. Invoke on context in order
+                         * to remove pending request.
+                         */
                         requestContext
                                 .handleErrorResult(newErrorResult(ResultCode.NO_SUCH_OPERATION));
                     }
@@ -694,9 +716,10 @@
                         .toString()));
                 return false;
             } else if (isClosed.get()) {
-                // A concurrent close may have already removed the pending
-                // request but
-                // it will have only been notified for cancellation.
+                /*
+                 * A concurrent close may have already removed the pending
+                 * request but it will have only been notified for cancellation.
+                 */
                 pendingRequests.remove(messageID);
 
                 final LocalizableMessage message = INFO_CLIENT_CONNECTION_CLOSING.get();
@@ -704,20 +727,21 @@
                         message.toString()));
                 return false;
             } else {
-                // If the connection is closed now then we just have to pay the
-                // cost of
-                // invoking the request in the request handler.
+                /*
+                 * If the connection is closed now then we just have to pay the
+                 * cost of invoking the request in the request handler.
+                 */
                 return true;
             }
         }
 
         private void doClose(final LocalizableMessage cancelReason) {
             if (!isClosed.getAndSet(true)) {
-                // At this point if any pending requests are added then we may
-                // end up
-                // cancelling them, but this does not matter since
-                // addPendingRequest
-                // will fail the request immediately.
+                /*
+                 * At this point if any pending requests are added then we may
+                 * end up cancelling them, but this does not matter since
+                 * addPendingRequest will fail the request immediately.
+                 */
                 final Iterator<RequestContextImpl<?, ?>> iterator =
                         pendingRequests.values().iterator();
                 while (iterator.hasNext()) {
@@ -752,6 +776,19 @@
 
     }
 
+    /**
+     * Adapts the provided request handler as a {@code ServerConnection}.
+     *
+     * @param requestHandler
+     *            The request handler.
+     * @return The server connection which will forward requests to the provided
+     *         request handler.
+     */
+    static ServerConnection<Integer> adaptRequestHandler(
+            final RequestHandler<RequestContext> requestHandler) {
+        return new ServerConnectionImpl(requestHandler);
+    }
+
     private final RequestHandlerFactory<C, RequestContext> factory;
 
     /**
@@ -772,8 +809,7 @@
     @Override
     public ServerConnection<Integer> handleAccept(final C clientContext)
             throws ErrorResultException {
-        final RequestHandler<RequestContext> requestHandler = factory.handleAccept(clientContext);
-        return new ServerConnectionImpl<C>(requestHandler);
+        return adaptRequestHandler(factory.handleAccept(clientContext));
     }
 
 }
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java
index e008a3f..c695030 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java
@@ -26,8 +26,7 @@
 package org.forgerock.opendj.ldap;
 
 import static org.fest.assertions.Assertions.assertThat;
-import static org.forgerock.opendj.ldap.Connections.newInternalConnectionFactory;
-import static org.forgerock.opendj.ldap.Connections.newServerConnectionFactory;
+import static org.forgerock.opendj.ldap.Connections.newInternalConnection;
 import static org.forgerock.opendj.ldap.requests.Requests.newAddRequest;
 import static org.forgerock.opendj.ldap.requests.Requests.newDeleteRequest;
 import static org.forgerock.opendj.ldap.requests.Requests.newModifyRequest;
@@ -398,7 +397,7 @@
                 getUser1Entry());
     }
 
-    private Connection getConnection() throws IOException, ErrorResultException {
+    private Connection getConnection() throws IOException {
         // @formatter:off
         final MemoryBackend backend =
                 new MemoryBackend(new LDIFEntryReader(
@@ -440,10 +439,7 @@
                 ));
         // @formatter:on
 
-        final Connection connection =
-                newInternalConnectionFactory(newServerConnectionFactory(backend), null)
-                        .getConnection();
-        return connection;
+        return newInternalConnection(backend);
     }
 
     private Entry getUser1Entry() {

--
Gitblit v1.10.0