From a33a4959bae9ae6b119d5049609e4b439f911ee1 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 28 Mar 2013 16:32:52 +0000
Subject: [PATCH] Additional change for OPENDJ-354: Implement a RequestHandler which provides an in-memory backend
---
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java | 206 ++++++++++++++++++++++++++++++---------------------
1 files changed, 121 insertions(+), 85 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
index 2f4ee76..191d943 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
@@ -21,12 +21,18 @@
* CDDL HEADER END
*
*
- * Copyright 2011-2012 ForgeRock AS
+ * Copyright 2011-2013 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
-import static org.forgerock.opendj.ldap.CoreMessages.*;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_ABANDON_REQUEST;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CANCEL_REQUEST;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CLIENT_DISCONNECT;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CLIENT_ERROR;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_SERVER_DISCONNECT;
+import static org.forgerock.opendj.ldap.CoreMessages.INFO_CLIENT_CONNECTION_CLOSING;
+import static org.forgerock.opendj.ldap.CoreMessages.WARN_CLIENT_DUPLICATE_MESSAGE_ID;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.util.Iterator;
@@ -72,9 +78,10 @@
private static class RequestContextImpl<S extends Result, H extends ResultHandler<? super S>>
implements RequestContext, ResultHandler<S> {
- // Adapter class which invokes cancel result handlers with correct
- // result
- // type.
+ /*
+ * Adapter class which invokes cancel result handlers with correct
+ * result type.
+ */
private static final class ExtendedResultHandlerHolder<R extends ExtendedResult> {
private final ExtendedRequest<R> request;
private final ResultHandler<? super R> resultHandler;
@@ -101,46 +108,46 @@
}
private static enum RequestState {
- // Request active
- PENDING,
-
// Request active, cancel requested
CANCEL_REQUESTED,
- // Request active, too late to cancel
- TOO_LATE,
+ // Result sent, was cancelled
+ CANCELLED,
+
+ // Request active
+ PENDING,
// Result sent, not cancelled
RESULT_SENT,
- // Result sent, was cancelled
- CANCELLED;
+ // Request active, too late to cancel
+ TOO_LATE;
}
- private final int messageID;
-
- // Cancellation state guarded by lock.
- private final Object stateLock = new Object();
+ protected final H resultHandler;
// These should be notified when a cancel request arrives, at most once.
private List<CancelRequestListener> cancelRequestListeners = null;
+ private LocalizableMessage cancelRequestReason = null;
+
// These should be notified when the result is set.
private List<ExtendedResultHandlerHolder<?>> cancelResultHandlers = null;
- private RequestState state = RequestState.PENDING;
-
- private LocalizableMessage cancelRequestReason = null;
-
- private boolean sendResult = true;
+ private final ServerConnectionImpl clientConnection;
private final boolean isCancelSupported;
- private final ServerConnectionImpl<?> clientConnection;
+ private final int messageID;
- protected final H resultHandler;
+ private boolean sendResult = true;
- protected RequestContextImpl(final ServerConnectionImpl<?> clientConnection,
+ private RequestState state = RequestState.PENDING;
+
+ // Cancellation state guarded by lock.
+ private final Object stateLock = new Object();
+
+ protected RequestContextImpl(final ServerConnectionImpl clientConnection,
final H resultHandler, final int messageID, final boolean isCancelSupported) {
this.clientConnection = clientConnection;
this.resultHandler = resultHandler;
@@ -171,9 +178,10 @@
case TOO_LATE:
case RESULT_SENT:
case CANCELLED:
- // No point in registering the callback since the request
- // can never be
- // cancelled now.
+ /*
+ * No point in registering the callback since the request
+ * can never be cancelled now.
+ */
break;
}
}
@@ -191,24 +199,28 @@
synchronized (stateLock) {
switch (state) {
case PENDING:
- // No cancel request, so no handlers, just switch state.
+ /* No cancel request, so no handlers, just switch state. */
if (signalTooLate) {
cancelRequestListeners = null;
state = RequestState.TOO_LATE;
}
break;
case CANCEL_REQUESTED:
- // Don't change state: let the handler ack the cancellation
- // request.
+ /*
+ * Don't change state: let the handler ack the cancellation
+ * request.
+ */
throw (CancelledResultException) newErrorResult(ResultCode.CANCELLED,
cancelRequestReason.toString());
case TOO_LATE:
- // Already too late. Nothing to do.
+ /* Already too late. Nothing to do. */
break;
case RESULT_SENT:
case CANCELLED:
- // This should not happen - could throw an illegal state
- // exception?
+ /*
+ * This should not happen - could throw an illegal state
+ * exception?
+ */
break;
}
}
@@ -229,11 +241,12 @@
public void handleErrorResult(final ErrorResultException error) {
if (clientConnection.removePendingRequest(this)) {
if (setResult(error.getResult())) {
- // FIXME: we must invoke the result handler even when
- // abandoned so
- // that chained result handlers may clean up, log, etc. We
- // really need
- // to signal that the result must not be sent to the client.
+ /*
+ * FIXME: we must invoke the result handler even when
+ * abandoned so that chained result handlers may clean up,
+ * log, etc. We really need to signal that the result must
+ * not be sent to the client.
+ */
}
resultHandler.handleErrorResult(error);
}
@@ -246,11 +259,12 @@
public void handleResult(final S result) {
if (clientConnection.removePendingRequest(this)) {
if (setResult(result)) {
- // FIXME: we must invoke the result handler even when
- // abandoned so
- // that chained result handlers may clean up, log, etc. We
- // really need
- // to signal that the result must not be sent to the client.
+ /*
+ * FIXME: we must invoke the result handler even when
+ * abandoned so that chained result handlers may clean up,
+ * log, etc. We really need to signal that the result must
+ * not be sent to the client.
+ */
}
resultHandler.handleResult(result);
}
@@ -291,7 +305,7 @@
synchronized (stateLock) {
switch (state) {
case PENDING:
- // Switch to CANCEL_REQUESTED state.
+ /* Switch to CANCEL_REQUESTED state. */
cancelRequestReason = reason;
if (cancelResultHandler != null) {
cancelResultHandlers = new LinkedList<ExtendedResultHandlerHolder<?>>();
@@ -304,7 +318,9 @@
this.sendResult &= sendResult;
break;
case CANCEL_REQUESTED:
- // Cancel already request so listeners already invoked.
+ /*
+ * Cancel already request so listeners already invoked.
+ */
if (cancelResultHandler != null) {
if (cancelResultHandlers == null) {
cancelResultHandlers = new LinkedList<ExtendedResultHandlerHolder<?>>();
@@ -315,20 +331,22 @@
break;
case TOO_LATE:
case RESULT_SENT:
- // Cannot cancel, so invoke result handler immediately
- // outside of
- // lock.
+ /*
+ * Cannot cancel, so invoke result handler immediately
+ * outside of lock.
+ */
if (cancelResultHandler != null) {
invokeResultHandler = true;
resultHandlerIsSuccess = false;
}
break;
case CANCELLED:
- // Multiple cancellation attempts. Clients should not do
- // this, but the
- // cancel will effectively succeed immediately, so invoke
- // result
- // handler immediately outside of lock.
+ /*
+ * Multiple cancellation attempts. Clients should not do
+ * this, but the cancel will effectively succeed
+ * immediately, so invoke result handler immediately outside
+ * of lock.
+ */
if (cancelResultHandler != null) {
invokeResultHandler = true;
resultHandlerIsSuccess = true;
@@ -337,7 +355,7 @@
}
}
- // Invoke listeners outside of lock.
+ /* Invoke listeners outside of lock. */
if (tmpListeners != null) {
for (final CancelRequestListener listener : tmpListeners) {
listener.handleCancelRequest(reason);
@@ -376,7 +394,7 @@
switch (state) {
case PENDING:
case TOO_LATE:
- // Switch to appropriate final state.
+ /* Switch to appropriate final state. */
if (!result.getResultCode().equals(ResultCode.CANCELLED)) {
state = RequestState.RESULT_SENT;
} else {
@@ -384,9 +402,10 @@
}
break;
case CANCEL_REQUESTED:
- // Switch to appropriate final state and invoke any cancel
- // request
- // handlers.
+ /*
+ * Switch to appropriate final state and invoke any cancel
+ * request handlers.
+ */
if (!result.getResultCode().equals(ResultCode.CANCELLED)) {
state = RequestState.RESULT_SENT;
} else {
@@ -399,14 +418,16 @@
break;
case RESULT_SENT:
case CANCELLED:
- // This should not happen - could throw an illegal state
- // exception?
+ /*
+ * This should not happen - could throw an illegal state
+ * exception?
+ */
maySendResult = false; // Prevent sending multiple results.
break;
}
}
- // Invoke handlers outside of lock.
+ /* Invoke handlers outside of lock. */
if (tmpHandlers != null) {
for (final ExtendedResultHandlerHolder<?> handler : tmpHandlers) {
if (isCancelled) {
@@ -427,7 +448,7 @@
private final static class SearchRequestContextImpl extends
RequestContextImpl<Result, SearchResultHandler> implements SearchResultHandler {
- private SearchRequestContextImpl(final ServerConnectionImpl<?> clientConnection,
+ private SearchRequestContextImpl(final ServerConnectionImpl clientConnection,
final SearchResultHandler resultHandler, final int messageID,
final boolean isCancelSupported) {
super(clientConnection, resultHandler, messageID, isCancelSupported);
@@ -450,11 +471,11 @@
}
}
- private static final class ServerConnectionImpl<C> implements ServerConnection<Integer> {
- private final RequestHandler<RequestContext> requestHandler;
+ private static final class ServerConnectionImpl implements ServerConnection<Integer> {
private final AtomicBoolean isClosed = new AtomicBoolean();
private final ConcurrentHashMap<Integer, RequestContextImpl<?, ?>> pendingRequests =
new ConcurrentHashMap<Integer, RequestContextImpl<?, ?>>();
+ private final RequestHandler<RequestContext> requestHandler;
private ServerConnectionImpl(final RequestHandler<RequestContext> requestHandler) {
this.requestHandler = requestHandler;
@@ -588,11 +609,11 @@
return;
}
- // Register the request in the pending requests table. Even
- // though
- // this request cannot be cancelled, it is important to do this
- // in
- // order to monitor the number of pending operations.
+ /*
+ * Register the request in the pending requests table. Even
+ * though this request cannot be cancelled, it is important to
+ * do this in order to monitor the number of pending operations.
+ */
final RequestContextImpl<R, ResultHandler<? super R>> requestContext =
new RequestContextImpl<R, ResultHandler<? super R>>(this, resultHandler,
messageID, false);
@@ -605,9 +626,10 @@
INFO_CANCELED_BY_CANCEL_REQUEST.get(messageID);
cancelledRequest.cancel(cancelReason, request, requestContext, true);
} else {
- // Couldn't find the request. Invoke on context in order
- // to remove
- // pending request.
+ /*
+ * Couldn't find the request. Invoke on context in order
+ * to remove pending request.
+ */
requestContext
.handleErrorResult(newErrorResult(ResultCode.NO_SUCH_OPERATION));
}
@@ -694,9 +716,10 @@
.toString()));
return false;
} else if (isClosed.get()) {
- // A concurrent close may have already removed the pending
- // request but
- // it will have only been notified for cancellation.
+ /*
+ * A concurrent close may have already removed the pending
+ * request but it will have only been notified for cancellation.
+ */
pendingRequests.remove(messageID);
final LocalizableMessage message = INFO_CLIENT_CONNECTION_CLOSING.get();
@@ -704,20 +727,21 @@
message.toString()));
return false;
} else {
- // If the connection is closed now then we just have to pay the
- // cost of
- // invoking the request in the request handler.
+ /*
+ * If the connection is closed now then we just have to pay the
+ * cost of invoking the request in the request handler.
+ */
return true;
}
}
private void doClose(final LocalizableMessage cancelReason) {
if (!isClosed.getAndSet(true)) {
- // At this point if any pending requests are added then we may
- // end up
- // cancelling them, but this does not matter since
- // addPendingRequest
- // will fail the request immediately.
+ /*
+ * At this point if any pending requests are added then we may
+ * end up cancelling them, but this does not matter since
+ * addPendingRequest will fail the request immediately.
+ */
final Iterator<RequestContextImpl<?, ?>> iterator =
pendingRequests.values().iterator();
while (iterator.hasNext()) {
@@ -752,6 +776,19 @@
}
+ /**
+ * Adapts the provided request handler as a {@code ServerConnection}.
+ *
+ * @param requestHandler
+ * The request handler.
+ * @return The server connection which will forward requests to the provided
+ * request handler.
+ */
+ static ServerConnection<Integer> adaptRequestHandler(
+ final RequestHandler<RequestContext> requestHandler) {
+ return new ServerConnectionImpl(requestHandler);
+ }
+
private final RequestHandlerFactory<C, RequestContext> factory;
/**
@@ -772,8 +809,7 @@
@Override
public ServerConnection<Integer> handleAccept(final C clientContext)
throws ErrorResultException {
- final RequestHandler<RequestContext> requestHandler = factory.handleAccept(clientContext);
- return new ServerConnectionImpl<C>(requestHandler);
+ return adaptRequestHandler(factory.handleAccept(clientContext));
}
}
--
Gitblit v1.10.0