From 965b47e7589c0d0179e00e321b09a8e743f15635 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 28 Mar 2013 23:22:04 +0000
Subject: [PATCH] OPENDJ-832 Leverage the work queue for processing requests received on the HTTP connection handler
---
opendj-sdk/opends/src/server/org/opends/server/protocols/http/HTTPClientConnection.java | 318 ++++++++++++++++++++++++++++++++++++++++++++++------
1 files changed, 280 insertions(+), 38 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/protocols/http/HTTPClientConnection.java b/opendj-sdk/opends/src/server/org/opends/server/protocols/http/HTTPClientConnection.java
index 2496cf4..0306288 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/protocols/http/HTTPClientConnection.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/protocols/http/HTTPClientConnection.java
@@ -26,49 +26,128 @@
*/
package org.opends.server.protocols.http;
+import static org.forgerock.opendj.adapter.server2x.Converters.*;
import static org.opends.messages.ProtocolMessages.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletRequest;
+import org.forgerock.opendj.ldap.ErrorResultException;
+import org.forgerock.opendj.ldap.SearchResultHandler;
+import org.forgerock.opendj.ldap.responses.Result;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.api.ClientConnection;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.SearchOperation;
-import org.opends.server.extensions.TLSCapableConnection;
+import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.CancelRequest;
import org.opends.server.types.CancelResult;
import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DisconnectReason;
import org.opends.server.types.IntermediateResponse;
import org.opends.server.types.Operation;
+import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
+import com.forgerock.opendj.util.AsynchronousFutureResult;
+
/**
* This class defines an HTTP client connection, which is a type of client
* connection that will be accepted by an instance of the HTTP connection
* handler.
*/
-final class HTTPClientConnection extends ClientConnection implements
- TLSCapableConnection
+final class HTTPClientConnection extends ClientConnection
{
+ // TODO JNR Confirm with Matt that persistent searches are inapplicable to
+ // Rest2LDAP.
+ // TODO JNR Should I override getIdleTime()?
+ // TODO JNR Implement stats
+
+ /**
+ * Class grouping together an {@link Operation} and its associated
+ * {@link AsynchronousFutureResult} to ensure they are both atomically added
+ * and removed from the {@link HTTPClientConnection#operationsInProgress} Map.
+ */
+ private static final class OperationWithFutureResult
+ {
+
+ final Operation operation;
+ final AsynchronousFutureResult<Result, SearchResultHandler> futureResult;
+
+ public OperationWithFutureResult(Operation operation,
+ AsynchronousFutureResult<Result, SearchResultHandler> futureResult)
+ {
+ this.operation = operation;
+ this.futureResult = futureResult;
+ }
+
+ }
+
+ /** The tracer object for the debug logger. */
+ private static final DebugTracer TRACER = getTracer();
+
+ /**
+ * Official servlet property giving access to the SSF (Security Strength
+ * Factor) used to encrypt the current connection.
+ */
+ private static final String SERVLET_SSF_CONSTANT =
+ "javax.servlet.request.key_size";
+
+ /**
+ * Indicates whether the Directory Server believes this connection to be valid
+ * and available for communication.
+ */
+ private volatile boolean connectionValid;
+
+ /**
+ * Indicates whether this connection is about to be closed. This will be used
+ * to prevent accepting new requests while a disconnect is in progress.
+ */
+ private boolean disconnectRequested;
+
+ /**
+ * The Map (messageID => {@link OperationWithFutureResult}) of all operations
+ * currently in progress on this connection.
+ */
+ private final Map<Integer, OperationWithFutureResult> operationsInProgress =
+ new ConcurrentHashMap<Integer, OperationWithFutureResult>();
+
+ /**
+ * The number of operations performed on this connection. Used to compare with
+ * the resource limits of the network group.
+ */
+ private final AtomicLong operationsPerformed = new AtomicLong(0);
+
+ /**
+ * The lock used to provide threadsafe access to the map of operations in
+ * progress. This is used when we want to prevent puts on this map while we
+ * are removing all operations in progress.
+ */
+ private final Object opsInProgressLock = new Object();
+
+ /** The connection ID assigned to this connection. */
+ private final long connectionID;
+
/** The reference to the connection handler that accepted this connection. */
private final HTTPConnectionHandler connectionHandler;
/** The servlet request representing this client connection. */
private final ServletRequest request;
- /** The connection ID assigned to this connection. */
- private final long connectionID;
-
/**
* Constructs an instance of this class.
*
@@ -179,24 +258,46 @@
@Override
public void sendResponse(Operation operation)
{
- // TODO Auto-generated method stub
+ OperationWithFutureResult op =
+ this.operationsInProgress.get(operation.getMessageID());
+ if (op != null)
+ {
+ try
+ {
+ op.futureResult.handleResult(getResponseResult(operation));
+ }
+ catch (ErrorResultException e)
+ {
+ op.futureResult.handleErrorResult(e);
+ }
+ }
}
/** {@inheritDoc} */
@Override
- public void sendSearchEntry(SearchOperation searchOperation,
+ public void sendSearchEntry(SearchOperation operation,
SearchResultEntry searchEntry) throws DirectoryException
{
- // TODO Auto-generated method stub
+ OperationWithFutureResult op =
+ this.operationsInProgress.get(operation.getMessageID());
+ if (op != null)
+ {
+ op.futureResult.getResultHandler().handleEntry(from(searchEntry));
+ }
}
/** {@inheritDoc} */
@Override
- public boolean sendSearchReference(SearchOperation searchOperation,
+ public boolean sendSearchReference(SearchOperation operation,
SearchResultReference searchReference) throws DirectoryException
{
- // TODO Auto-generated method stub
- return false;
+ OperationWithFutureResult op =
+ this.operationsInProgress.get(operation.getMessageID());
+ if (op != null)
+ {
+ op.futureResult.getResultHandler().handleReference(from(searchReference));
+ }
+ return connectionValid;
}
/** {@inheritDoc} */
@@ -208,36 +309,128 @@
return false;
}
- /** {@inheritDoc} */
+ /**
+ * {@inheritDoc}
+ *
+ * @param sendNotification
+ * not used with HTTP.
+ */
@Override
public void disconnect(DisconnectReason disconnectReason,
boolean sendNotification, Message message)
{
- // TODO Auto-generated method stub
+ // Set a flag indicating that the connection is being terminated so
+ // that no new requests will be accepted. Also cancel all operations
+ // in progress.
+ synchronized (opsInProgressLock)
+ {
+ // If we are already in the middle of a disconnect, then don't
+ // do anything.
+ if (disconnectRequested)
+ {
+ return;
+ }
+
+ disconnectRequested = true;
+ }
+
+ // TODO JNR
+ // if (keepStats)
+ // {
+ // statTracker.updateDisconnect();
+ // }
+
+ if (connectionID >= 0)
+ {
+ DirectoryServer.connectionClosed(this);
+ }
+
+ // Indicate that this connection is no longer valid.
+ connectionValid = false;
+
+ if (message != null)
+ {
+ MessageBuilder msgBuilder = new MessageBuilder();
+ msgBuilder.append(disconnectReason.getClosureMessage());
+ msgBuilder.append(": ");
+ msgBuilder.append(message);
+ cancelAllOperations(new CancelRequest(true, msgBuilder.toMessage()));
+ }
+ else
+ {
+ cancelAllOperations(new CancelRequest(true, disconnectReason
+ .getClosureMessage()));
+ }
+ finalizeConnectionInternal();
}
/** {@inheritDoc} */
@Override
public Collection<Operation> getOperationsInProgress()
{
- // TODO Auto-generated method stub
- return null;
+ Collection<OperationWithFutureResult> values =
+ operationsInProgress.values();
+ Collection<Operation> results = new ArrayList<Operation>(values.size());
+ for (OperationWithFutureResult op : values)
+ {
+ results.add(op.operation);
+ }
+ return results;
}
/** {@inheritDoc} */
@Override
public Operation getOperationInProgress(int messageID)
{
- // TODO Auto-generated method stub
+ OperationWithFutureResult op = operationsInProgress.get(messageID);
+ if (op != null)
+ {
+ return op.operation;
+ }
return null;
}
+ /**
+ * Adds the passed in operation to the in progress list along with the
+ * associated future.
+ *
+ * @param operation
+ * the operation to add to the in progress list
+ * @param futureResult
+ * the future associated to the operation.
+ * @throws DirectoryException
+ * If an error occurs
+ */
+ void addOperationInProgress(Operation operation,
+ AsynchronousFutureResult<Result, SearchResultHandler> futureResult)
+ throws DirectoryException
+ {
+ synchronized (opsInProgressLock)
+ {
+ // If we're already in the process of disconnecting the client,
+ // then reject the operation.
+ if (disconnectRequested)
+ {
+ Message message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
+ }
+
+ operationsInProgress.put(operation.getMessageID(),
+ new OperationWithFutureResult(operation, futureResult));
+ }
+ }
+
/** {@inheritDoc} */
@Override
public boolean removeOperationInProgress(int messageID)
{
- // TODO Auto-generated method stub
- return false;
+ final OperationWithFutureResult previousValue =
+ operationsInProgress.remove(messageID);
+ if (previousValue != null)
+ {
+ operationsPerformed.incrementAndGet();
+ }
+ return previousValue != null;
}
/** {@inheritDoc} */
@@ -245,15 +438,51 @@
public CancelResult cancelOperation(int messageID,
CancelRequest cancelRequest)
{
- // TODO Auto-generated method stub
- return null;
+ OperationWithFutureResult op = operationsInProgress.remove(messageID);
+ if (op != null)
+ {
+ op.futureResult.handleErrorResult(ErrorResultException
+ .newErrorResult(org.forgerock.opendj.ldap.ResultCode.CANCELLED));
+ return op.operation.cancel(cancelRequest);
+ }
+ return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
}
/** {@inheritDoc} */
@Override
public void cancelAllOperations(CancelRequest cancelRequest)
{
- // TODO Auto-generated method stub
+ synchronized (opsInProgressLock)
+ {
+ try
+ {
+ for (OperationWithFutureResult op : operationsInProgress.values())
+ {
+ try
+ {
+ op.futureResult.handleErrorResult(ErrorResultException
+ .newErrorResult(org.forgerock.opendj.ldap.ResultCode.CANCELLED));
+ op.operation.abort(cancelRequest);
+ }
+ catch (Exception e)
+ { // make sure all operations are cancelled, no mattter what
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ }
+
+ operationsInProgress.clear();
+ }
+ catch (Exception e)
+ { // TODO JNR should I keep this catch?
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ }
}
/** {@inheritDoc} */
@@ -261,15 +490,25 @@
public void cancelAllOperationsExcept(CancelRequest cancelRequest,
int messageID)
{
- // TODO Auto-generated method stub
+ synchronized (opsInProgressLock)
+ {
+ OperationWithFutureResult toKeep = operationsInProgress.remove(messageID);
+ try
+ {
+ cancelAllOperations(cancelRequest);
+ }
+ finally
+ { // Ensure we always put back this operation
+ operationsInProgress.put(messageID, toKeep);
+ }
+ }
}
/** {@inheritDoc} */
@Override
public long getNumberOfOperations()
{
- // TODO Auto-generated method stub
- return 0;
+ return this.operationsPerformed.get();
}
/** {@inheritDoc} */
@@ -310,19 +549,9 @@
/** {@inheritDoc} */
@Override
- public boolean prepareTLS(MessageBuilder unavailableReason)
- {
- // TODO JNR add message to mention that this client connection cannot start
- // TLS
- unavailableReason.append(INFO_HTTP_CONNHANDLER_STARTTLS_NOT_SUPPORTED);
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
public int getSSF()
{
- Object attribute = request.getAttribute("javax.servlet.request.key_size");
+ Object attribute = request.getAttribute(SERVLET_SSF_CONSTANT);
if (attribute instanceof Number)
{
return ((Number) attribute).intValue();
@@ -333,12 +562,25 @@
{
return Integer.parseInt((String) attribute);
}
- catch (IllegalArgumentException e)
+ catch (IllegalArgumentException ignored)
{
- // TODO tracer debug
+ // We cannot do much about it. Just log it.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
+ }
}
}
return 0;
}
+ /**
+ * Returns whether the client connection is valid.
+ *
+ * @return true if the connection is valid, false otherwise
+ */
+ boolean isConnectionValid()
+ {
+ return connectionValid;
+ }
}
--
Gitblit v1.10.0