From 5eaa5880d0d0fbc1290e66c1173e60f7da7b446c Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 28 Mar 2013 16:32:52 +0000
Subject: [PATCH] Additional change for OPENDJ-354: Implement a RequestHandler which provides an in-memory backend
---
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java | 206 ++++++++++++++++++++--------------
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java | 118 +++++++++++++++++--
opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java | 10 -
3 files changed, 226 insertions(+), 108 deletions(-)
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
index 6ab7125..92c7641 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -27,12 +27,15 @@
package org.forgerock.opendj.ldap;
+import static org.forgerock.opendj.ldap.RequestHandlerFactoryAdapter.adaptRequestHandler;
+
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.SearchRequest;
+import com.forgerock.opendj.ldap.InternalConnection;
import com.forgerock.opendj.util.Validator;
/**
@@ -195,6 +198,94 @@
}
/**
+ * Creates a new internal client connection which will route requests to the
+ * provided {@code RequestHandler}.
+ * <p>
+ * When processing requests, {@code RequestHandler} implementations are
+ * passed a {@code RequestContext} having a pseudo {@code requestID} which
+ * is incremented for each successive internal request on a per client
+ * connection basis. The request ID may be useful for logging purposes.
+ * <p>
+ * An internal connection does not require {@code RequestHandler}
+ * implementations to return a result when processing requests. However, it
+ * is recommended that implementations do always return results even for
+ * abandoned requests. This is because application client threads may block
+ * indefinitely waiting for results.
+ *
+ * @param requestHandler
+ * The request handler which will be used for all client
+ * connections.
+ * @return The new internal connection.
+ * @throws NullPointerException
+ * If {@code requestHandler} was {@code null}.
+ */
+ public static Connection newInternalConnection(
+ final RequestHandler<RequestContext> requestHandler) {
+ Validator.ensureNotNull(requestHandler);
+ return newInternalConnection(adaptRequestHandler(requestHandler));
+ }
+
+ /**
+ * Creates a new internal client connection which will route requests to the
+ * provided {@code ServerConnection}.
+ * <p>
+ * When processing requests, {@code ServerConnection} implementations are
+ * passed an integer as the first parameter. This integer represents a
+ * pseudo {@code requestID} which is incremented for each successive
+ * internal request on a per client connection basis. The request ID may be
+ * useful for logging purposes.
+ * <p>
+ * An internal connection does not require {@code ServerConnection}
+ * implementations to return a result when processing requests. However, it
+ * is recommended that implementations do always return results even for
+ * abandoned requests. This is because application client threads may block
+ * indefinitely waiting for results.
+ *
+ * @param serverConnection
+ * The server connection.
+ * @return The new internal connection.
+ * @throws NullPointerException
+ * If {@code serverConnection} was {@code null}.
+ */
+ public static Connection newInternalConnection(final ServerConnection<Integer> serverConnection) {
+ Validator.ensureNotNull(serverConnection);
+ return new InternalConnection(serverConnection);
+ }
+
+ /**
+ * Creates a new connection factory which binds internal client connections
+ * to {@link RequestHandler}s created using the provided
+ * {@link RequestHandlerFactory}.
+ * <p>
+ * When processing requests, {@code RequestHandler} implementations are
+ * passed an integer as the first parameter. This integer represents a
+ * pseudo {@code requestID} which is incremented for each successive
+ * internal request on a per client connection basis. The request ID may be
+ * useful for logging purposes.
+ * <p>
+ * An internal connection factory does not require {@code RequestHandler}
+ * implementations to return a result when processing requests. However, it
+ * is recommended that implementations do always return results even for
+ * abandoned requests. This is because application client threads may block
+ * indefinitely waiting for results.
+ *
+ * @param <C>
+ * The type of client context.
+ * @param factory
+ * The request handler factory to use for creating connections.
+ * @param clientContext
+ * The client context.
+ * @return The new internal connection factory.
+ * @throws NullPointerException
+ * If {@code factory} was {@code null}.
+ */
+ public static <C> ConnectionFactory newInternalConnectionFactory(
+ final RequestHandlerFactory<C, RequestContext> factory, final C clientContext) {
+ Validator.ensureNotNull(factory);
+ return new InternalConnectionFactory<C>(newServerConnectionFactory(factory), clientContext);
+ }
+
+ /**
* Creates a new connection factory which binds internal client connections
* to {@link ServerConnection}s created using the provided
* {@link ServerConnectionFactory}.
@@ -265,14 +356,14 @@
return new ConnectionFactory() {
@Override
- public FutureResult<Connection> getConnectionAsync(
- final ResultHandler<? super Connection> handler) {
- return factory.getConnectionAsync(handler);
+ public Connection getConnection() throws ErrorResultException {
+ return factory.getConnection();
}
@Override
- public Connection getConnection() throws ErrorResultException {
- return factory.getConnection();
+ public FutureResult<Connection> getConnectionAsync(
+ final ResultHandler<? super Connection> handler) {
+ return factory.getConnectionAsync(handler);
}
/**
@@ -313,17 +404,12 @@
public static <C> ServerConnectionFactory<C, Integer> newServerConnectionFactory(
final RequestHandler<RequestContext> requestHandler) {
Validator.ensureNotNull(requestHandler);
-
- final RequestHandlerFactory<C, RequestContext> factory =
- new RequestHandlerFactory<C, RequestContext>() {
-
- public RequestHandler<RequestContext> handleAccept(C clientContext)
- throws ErrorResultException {
- return requestHandler;
- }
- };
-
- return new RequestHandlerFactoryAdapter<C>(factory);
+ return new RequestHandlerFactoryAdapter<C>(new RequestHandlerFactory<C, RequestContext>() {
+ @Override
+ public RequestHandler<RequestContext> handleAccept(final C clientContext) {
+ return requestHandler;
+ }
+ });
}
/**
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
index 2f4ee76..191d943 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
@@ -21,12 +21,18 @@
* CDDL HEADER END
*
*
- * Copyright 2011-2012 ForgeRock AS
+ * Copyright 2011-2013 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
-import static org.forgerock.opendj.ldap.CoreMessages.*;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_ABANDON_REQUEST;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CANCEL_REQUEST;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CLIENT_DISCONNECT;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CLIENT_ERROR;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_SERVER_DISCONNECT;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CLIENT_CONNECTION_CLOSING;
+import static org.forgerock.opendj.ldap.CoreMessages.WARN_CLIENT_DUPLICATE_MESSAGE_ID;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.util.Iterator;
@@ -72,9 +78,10 @@
private static class RequestContextImpl<S extends Result, H extends ResultHandler<? super S>>
implements RequestContext, ResultHandler<S> {
- // Adapter class which invokes cancel result handlers with correct
- // result
- // type.
+ /*
+ * Adapter class which invokes cancel result handlers with correct
+ * result type.
+ */
private static final class ExtendedResultHandlerHolder<R extends ExtendedResult> {
private final ExtendedRequest<R> request;
private final ResultHandler<? super R> resultHandler;
@@ -101,46 +108,46 @@
}
private static enum RequestState {
- // Request active
- PENDING,
-
// Request active, cancel requested
CANCEL_REQUESTED,
- // Request active, too late to cancel
- TOO_LATE,
+ // Result sent, was cancelled
+ CANCELLED,
+
+ // Request active
+ PENDING,
// Result sent, not cancelled
RESULT_SENT,
- // Result sent, was cancelled
- CANCELLED;
+ // Request active, too late to cancel
+ TOO_LATE;
}
- private final int messageID;
-
- // Cancellation state guarded by lock.
- private final Object stateLock = new Object();
+ protected final H resultHandler;
// These should be notified when a cancel request arrives, at most once.
private List<CancelRequestListener> cancelRequestListeners = null;
+ private LocalizableMessage cancelRequestReason = null;
+
// These should be notified when the result is set.
private List<ExtendedResultHandlerHolder<?>> cancelResultHandlers = null;
- private RequestState state = RequestState.PENDING;
-
- private LocalizableMessage cancelRequestReason = null;
-
- private boolean sendResult = true;
+ private final ServerConnectionImpl clientConnection;
private final boolean isCancelSupported;
- private final ServerConnectionImpl<?> clientConnection;
+ private final int messageID;
- protected final H resultHandler;
+ private boolean sendResult = true;
- protected RequestContextImpl(final ServerConnectionImpl<?> clientConnection,
+ private RequestState state = RequestState.PENDING;
+
+ // Cancellation state guarded by lock.
+ private final Object stateLock = new Object();
+
+ protected RequestContextImpl(final ServerConnectionImpl clientConnection,
final H resultHandler, final int messageID, final boolean isCancelSupported) {
this.clientConnection = clientConnection;
this.resultHandler = resultHandler;
@@ -171,9 +178,10 @@
case TOO_LATE:
case RESULT_SENT:
case CANCELLED:
- // No point in registering the callback since the request
- // can never be
- // cancelled now.
+ /*
+ * No point in registering the callback since the request
+ * can never be cancelled now.
+ */
break;
}
}
@@ -191,24 +199,28 @@
synchronized (stateLock) {
switch (state) {
case PENDING:
- // No cancel request, so no handlers, just switch state.
+ /* No cancel request, so no handlers, just switch state. */
if (signalTooLate) {
cancelRequestListeners = null;
state = RequestState.TOO_LATE;
}
break;
case CANCEL_REQUESTED:
- // Don't change state: let the handler ack the cancellation
- // request.
+ /*
+ * Don't change state: let the handler ack the cancellation
+ * request.
+ */
throw (CancelledResultException) newErrorResult(ResultCode.CANCELLED,
cancelRequestReason.toString());
case TOO_LATE:
- // Already too late. Nothing to do.
+ /* Already too late. Nothing to do. */
break;
case RESULT_SENT:
case CANCELLED:
- // This should not happen - could throw an illegal state
- // exception?
+ /*
+ * This should not happen - could throw an illegal state
+ * exception?
+ */
break;
}
}
@@ -229,11 +241,12 @@
public void handleErrorResult(final ErrorResultException error) {
if (clientConnection.removePendingRequest(this)) {
if (setResult(error.getResult())) {
- // FIXME: we must invoke the result handler even when
- // abandoned so
- // that chained result handlers may clean up, log, etc. We
- // really need
- // to signal that the result must not be sent to the client.
+ /*
+ * FIXME: we must invoke the result handler even when
+ * abandoned so that chained result handlers may clean up,
+ * log, etc. We really need to signal that the result must
+ * not be sent to the client.
+ */
}
resultHandler.handleErrorResult(error);
}
@@ -246,11 +259,12 @@
public void handleResult(final S result) {
if (clientConnection.removePendingRequest(this)) {
if (setResult(result)) {
- // FIXME: we must invoke the result handler even when
- // abandoned so
- // that chained result handlers may clean up, log, etc. We
- // really need
- // to signal that the result must not be sent to the client.
+ /*
+ * FIXME: we must invoke the result handler even when
+ * abandoned so that chained result handlers may clean up,
+ * log, etc. We really need to signal that the result must
+ * not be sent to the client.
+ */
}
resultHandler.handleResult(result);
}
@@ -291,7 +305,7 @@
synchronized (stateLock) {
switch (state) {
case PENDING:
- // Switch to CANCEL_REQUESTED state.
+ /* Switch to CANCEL_REQUESTED state. */
cancelRequestReason = reason;
if (cancelResultHandler != null) {
cancelResultHandlers = new LinkedList<ExtendedResultHandlerHolder<?>>();
@@ -304,7 +318,9 @@
this.sendResult &= sendResult;
break;
case CANCEL_REQUESTED:
- // Cancel already request so listeners already invoked.
+ /*
+ * Cancel already request so listeners already invoked.
+ */
if (cancelResultHandler != null) {
if (cancelResultHandlers == null) {
cancelResultHandlers = new LinkedList<ExtendedResultHandlerHolder<?>>();
@@ -315,20 +331,22 @@
break;
case TOO_LATE:
case RESULT_SENT:
- // Cannot cancel, so invoke result handler immediately
- // outside of
- // lock.
+ /*
+ * Cannot cancel, so invoke result handler immediately
+ * outside of lock.
+ */
if (cancelResultHandler != null) {
invokeResultHandler = true;
resultHandlerIsSuccess = false;
}
break;
case CANCELLED:
- // Multiple cancellation attempts. Clients should not do
- // this, but the
- // cancel will effectively succeed immediately, so invoke
- // result
- // handler immediately outside of lock.
+ /*
+ * Multiple cancellation attempts. Clients should not do
+ * this, but the cancel will effectively succeed
+ * immediately, so invoke result handler immediately outside
+ * of lock.
+ */
if (cancelResultHandler != null) {
invokeResultHandler = true;
resultHandlerIsSuccess = true;
@@ -337,7 +355,7 @@
}
}
- // Invoke listeners outside of lock.
+ /* Invoke listeners outside of lock. */
if (tmpListeners != null) {
for (final CancelRequestListener listener : tmpListeners) {
listener.handleCancelRequest(reason);
@@ -376,7 +394,7 @@
switch (state) {
case PENDING:
case TOO_LATE:
- // Switch to appropriate final state.
+ /* Switch to appropriate final state. */
if (!result.getResultCode().equals(ResultCode.CANCELLED)) {
state = RequestState.RESULT_SENT;
} else {
@@ -384,9 +402,10 @@
}
break;
case CANCEL_REQUESTED:
- // Switch to appropriate final state and invoke any cancel
- // request
- // handlers.
+ /*
+ * Switch to appropriate final state and invoke any cancel
+ * request handlers.
+ */
if (!result.getResultCode().equals(ResultCode.CANCELLED)) {
state = RequestState.RESULT_SENT;
} else {
@@ -399,14 +418,16 @@
break;
case RESULT_SENT:
case CANCELLED:
- // This should not happen - could throw an illegal state
- // exception?
+ /*
+ * This should not happen - could throw an illegal state
+ * exception?
+ */
maySendResult = false; // Prevent sending multiple results.
break;
}
}
- // Invoke handlers outside of lock.
+ /* Invoke handlers outside of lock. */
if (tmpHandlers != null) {
for (final ExtendedResultHandlerHolder<?> handler : tmpHandlers) {
if (isCancelled) {
@@ -427,7 +448,7 @@
private final static class SearchRequestContextImpl extends
RequestContextImpl<Result, SearchResultHandler> implements SearchResultHandler {
- private SearchRequestContextImpl(final ServerConnectionImpl<?> clientConnection,
+ private SearchRequestContextImpl(final ServerConnectionImpl clientConnection,
final SearchResultHandler resultHandler, final int messageID,
final boolean isCancelSupported) {
super(clientConnection, resultHandler, messageID, isCancelSupported);
@@ -450,11 +471,11 @@
}
}
- private static final class ServerConnectionImpl<C> implements ServerConnection<Integer> {
- private final RequestHandler<RequestContext> requestHandler;
+ private static final class ServerConnectionImpl implements ServerConnection<Integer> {
private final AtomicBoolean isClosed = new AtomicBoolean();
private final ConcurrentHashMap<Integer, RequestContextImpl<?, ?>> pendingRequests =
new ConcurrentHashMap<Integer, RequestContextImpl<?, ?>>();
+ private final RequestHandler<RequestContext> requestHandler;
private ServerConnectionImpl(final RequestHandler<RequestContext> requestHandler) {
this.requestHandler = requestHandler;
@@ -588,11 +609,11 @@
return;
}
- // Register the request in the pending requests table. Even
- // though
- // this request cannot be cancelled, it is important to do this
- // in
- // order to monitor the number of pending operations.
+ /*
+ * Register the request in the pending requests table. Even
+ * though this request cannot be cancelled, it is important to
+ * do this in order to monitor the number of pending operations.
+ */
final RequestContextImpl<R, ResultHandler<? super R>> requestContext =
new RequestContextImpl<R, ResultHandler<? super R>>(this, resultHandler,
messageID, false);
@@ -605,9 +626,10 @@
INFO_CANCELED_BY_CANCEL_REQUEST.get(messageID);
cancelledRequest.cancel(cancelReason, request, requestContext, true);
} else {
- // Couldn't find the request. Invoke on context in order
- // to remove
- // pending request.
+ /*
+ * Couldn't find the request. Invoke on context in order
+ * to remove pending request.
+ */
requestContext
.handleErrorResult(newErrorResult(ResultCode.NO_SUCH_OPERATION));
}
@@ -694,9 +716,10 @@
.toString()));
return false;
} else if (isClosed.get()) {
- // A concurrent close may have already removed the pending
- // request but
- // it will have only been notified for cancellation.
+ /*
+ * A concurrent close may have already removed the pending
+ * request but it will have only been notified for cancellation.
+ */
pendingRequests.remove(messageID);
final LocalizableMessage message = INFO_CLIENT_CONNECTION_CLOSING.get();
@@ -704,20 +727,21 @@
message.toString()));
return false;
} else {
- // If the connection is closed now then we just have to pay the
- // cost of
- // invoking the request in the request handler.
+ /*
+ * If the connection is closed now then we just have to pay the
+ * cost of invoking the request in the request handler.
+ */
return true;
}
}
private void doClose(final LocalizableMessage cancelReason) {
if (!isClosed.getAndSet(true)) {
- // At this point if any pending requests are added then we may
- // end up
- // cancelling them, but this does not matter since
- // addPendingRequest
- // will fail the request immediately.
+ /*
+ * At this point if any pending requests are added then we may
+ * end up cancelling them, but this does not matter since
+ * addPendingRequest will fail the request immediately.
+ */
final Iterator<RequestContextImpl<?, ?>> iterator =
pendingRequests.values().iterator();
while (iterator.hasNext()) {
@@ -752,6 +776,19 @@
}
+ /**
+ * Adapts the provided request handler as a {@code ServerConnection}.
+ *
+ * @param requestHandler
+ * The request handler.
+ * @return The server connection which will forward requests to the provided
+ * request handler.
+ */
+ static ServerConnection<Integer> adaptRequestHandler(
+ final RequestHandler<RequestContext> requestHandler) {
+ return new ServerConnectionImpl(requestHandler);
+ }
+
private final RequestHandlerFactory<C, RequestContext> factory;
/**
@@ -772,8 +809,7 @@
@Override
public ServerConnection<Integer> handleAccept(final C clientContext)
throws ErrorResultException {
- final RequestHandler<RequestContext> requestHandler = factory.handleAccept(clientContext);
- return new ServerConnectionImpl<C>(requestHandler);
+ return adaptRequestHandler(factory.handleAccept(clientContext));
}
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java
index e008a3f..c695030 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java
@@ -26,8 +26,7 @@
package org.forgerock.opendj.ldap;
import static org.fest.assertions.Assertions.assertThat;
-import static org.forgerock.opendj.ldap.Connections.newInternalConnectionFactory;
-import static org.forgerock.opendj.ldap.Connections.newServerConnectionFactory;
+import static org.forgerock.opendj.ldap.Connections.newInternalConnection;
import static org.forgerock.opendj.ldap.requests.Requests.newAddRequest;
import static org.forgerock.opendj.ldap.requests.Requests.newDeleteRequest;
import static org.forgerock.opendj.ldap.requests.Requests.newModifyRequest;
@@ -398,7 +397,7 @@
getUser1Entry());
}
- private Connection getConnection() throws IOException, ErrorResultException {
+ private Connection getConnection() throws IOException {
// @formatter:off
final MemoryBackend backend =
new MemoryBackend(new LDIFEntryReader(
@@ -440,10 +439,7 @@
));
// @formatter:on
- final Connection connection =
- newInternalConnectionFactory(newServerConnectionFactory(backend), null)
- .getConnection();
- return connection;
+ return newInternalConnection(backend);
}
private Entry getUser1Entry() {
--
Gitblit v1.10.0