From 965b47e7589c0d0179e00e321b09a8e743f15635 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 28 Mar 2013 23:22:04 +0000
Subject: [PATCH] OPENDJ-832 Leverage the work queue for processing requests received on the HTTP connection handler 

---
 opendj-sdk/opends/src/server/org/opends/server/protocols/http/HTTPClientConnection.java |  318 ++++++++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 280 insertions(+), 38 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/protocols/http/HTTPClientConnection.java b/opendj-sdk/opends/src/server/org/opends/server/protocols/http/HTTPClientConnection.java
index 2496cf4..0306288 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/protocols/http/HTTPClientConnection.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/protocols/http/HTTPClientConnection.java
@@ -26,49 +26,128 @@
  */
 package org.opends.server.protocols.http;
 
+import static org.forgerock.opendj.adapter.server2x.Converters.*;
 import static org.opends.messages.ProtocolMessages.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.servlet.ServletRequest;
 
+import org.forgerock.opendj.ldap.ErrorResultException;
+import org.forgerock.opendj.ldap.SearchResultHandler;
+import org.forgerock.opendj.ldap.responses.Result;
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.api.ClientConnection;
 import org.opends.server.api.ConnectionHandler;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.SearchOperation;
-import org.opends.server.extensions.TLSCapableConnection;
+import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.types.CancelRequest;
 import org.opends.server.types.CancelResult;
 import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.DisconnectReason;
 import org.opends.server.types.IntermediateResponse;
 import org.opends.server.types.Operation;
+import org.opends.server.types.ResultCode;
 import org.opends.server.types.SearchResultEntry;
 import org.opends.server.types.SearchResultReference;
 
+import com.forgerock.opendj.util.AsynchronousFutureResult;
+
 /**
  * This class defines an HTTP client connection, which is a type of client
  * connection that will be accepted by an instance of the HTTP connection
  * handler.
  */
-final class HTTPClientConnection extends ClientConnection implements
-    TLSCapableConnection
+final class HTTPClientConnection extends ClientConnection
 {
 
+  // TODO JNR Confirm with Matt that persistent searches are inapplicable to
+  // Rest2LDAP.
+  // TODO JNR Should I override getIdleTime()?
+  // TODO JNR Implement stats
+
+  /**
+   * Class grouping together an {@link Operation} and its associated
+   * {@link AsynchronousFutureResult} to ensure they are both atomically added
+   * and removed from the {@link HTTPClientConnection#operationsInProgress} Map.
+   */
+  private static final class OperationWithFutureResult
+  {
+
+    final Operation operation;
+    final AsynchronousFutureResult<Result, SearchResultHandler> futureResult;
+
+    public OperationWithFutureResult(Operation operation,
+        AsynchronousFutureResult<Result, SearchResultHandler> futureResult)
+    {
+      this.operation = operation;
+      this.futureResult = futureResult;
+    }
+
+  }
+
+  /** The tracer object for the debug logger. */
+  private static final DebugTracer TRACER = getTracer();
+
+  /**
+   * Official servlet property giving access to the SSF (Security Strength
+   * Factor) used to encrypt the current connection.
+   */
+  private static final String SERVLET_SSF_CONSTANT =
+      "javax.servlet.request.key_size";
+
+  /**
+   * Indicates whether the Directory Server believes this connection to be valid
+   * and available for communication.
+   */
+  private volatile boolean connectionValid;
+
+  /**
+   * Indicates whether this connection is about to be closed. This will be used
+   * to prevent accepting new requests while a disconnect is in progress.
+   */
+  private boolean disconnectRequested;
+
+  /**
+   * The Map (messageID => {@link OperationWithFutureResult}) of all operations
+   * currently in progress on this connection.
+   */
+  private final Map<Integer, OperationWithFutureResult> operationsInProgress =
+      new ConcurrentHashMap<Integer, OperationWithFutureResult>();
+
+  /**
+   * The number of operations performed on this connection. Used to compare with
+   * the resource limits of the network group.
+   */
+  private final AtomicLong operationsPerformed = new AtomicLong(0);
+
+  /**
+   * The lock used to provide threadsafe access to the map of operations in
+   * progress. This is used when we want to prevent puts on this map while we
+   * are removing all operations in progress.
+   */
+  private final Object opsInProgressLock = new Object();
+
+  /** The connection ID assigned to this connection. */
+  private final long connectionID;
+
   /** The reference to the connection handler that accepted this connection. */
   private final HTTPConnectionHandler connectionHandler;
 
   /** The servlet request representing this client connection. */
   private final ServletRequest request;
 
-  /** The connection ID assigned to this connection. */
-  private final long connectionID;
-
   /**
    * Constructs an instance of this class.
    *
@@ -179,24 +258,46 @@
   @Override
   public void sendResponse(Operation operation)
   {
-    // TODO Auto-generated method stub
+    OperationWithFutureResult op =
+        this.operationsInProgress.get(operation.getMessageID());
+    if (op != null)
+    {
+      try
+      {
+        op.futureResult.handleResult(getResponseResult(operation));
+      }
+      catch (ErrorResultException e)
+      {
+        op.futureResult.handleErrorResult(e);
+      }
+    }
   }
 
   /** {@inheritDoc} */
   @Override
-  public void sendSearchEntry(SearchOperation searchOperation,
+  public void sendSearchEntry(SearchOperation operation,
       SearchResultEntry searchEntry) throws DirectoryException
   {
-    // TODO Auto-generated method stub
+    OperationWithFutureResult op =
+        this.operationsInProgress.get(operation.getMessageID());
+    if (op != null)
+    {
+      op.futureResult.getResultHandler().handleEntry(from(searchEntry));
+    }
   }
 
   /** {@inheritDoc} */
   @Override
-  public boolean sendSearchReference(SearchOperation searchOperation,
+  public boolean sendSearchReference(SearchOperation operation,
       SearchResultReference searchReference) throws DirectoryException
   {
-    // TODO Auto-generated method stub
-    return false;
+    OperationWithFutureResult op =
+        this.operationsInProgress.get(operation.getMessageID());
+    if (op != null)
+    {
+      op.futureResult.getResultHandler().handleReference(from(searchReference));
+    }
+    return connectionValid;
   }
 
   /** {@inheritDoc} */
@@ -208,36 +309,128 @@
     return false;
   }
 
-  /** {@inheritDoc} */
+  /**
+   * {@inheritDoc}
+   *
+   * @param sendNotification
+   *          not used with HTTP.
+   */
   @Override
   public void disconnect(DisconnectReason disconnectReason,
       boolean sendNotification, Message message)
   {
-    // TODO Auto-generated method stub
+    // Set a flag indicating that the connection is being terminated so
+    // that no new requests will be accepted. Also cancel all operations
+    // in progress.
+    synchronized (opsInProgressLock)
+    {
+      // If we are already in the middle of a disconnect, then don't
+      // do anything.
+      if (disconnectRequested)
+      {
+        return;
+      }
+
+      disconnectRequested = true;
+    }
+
+    // TODO JNR
+    // if (keepStats)
+    // {
+    // statTracker.updateDisconnect();
+    // }
+
+    if (connectionID >= 0)
+    {
+      DirectoryServer.connectionClosed(this);
+    }
+
+    // Indicate that this connection is no longer valid.
+    connectionValid = false;
+
+    if (message != null)
+    {
+      MessageBuilder msgBuilder = new MessageBuilder();
+      msgBuilder.append(disconnectReason.getClosureMessage());
+      msgBuilder.append(": ");
+      msgBuilder.append(message);
+      cancelAllOperations(new CancelRequest(true, msgBuilder.toMessage()));
+    }
+    else
+    {
+      cancelAllOperations(new CancelRequest(true, disconnectReason
+          .getClosureMessage()));
+    }
+    finalizeConnectionInternal();
   }
 
   /** {@inheritDoc} */
   @Override
   public Collection<Operation> getOperationsInProgress()
   {
-    // TODO Auto-generated method stub
-    return null;
+    Collection<OperationWithFutureResult> values =
+        operationsInProgress.values();
+    Collection<Operation> results = new ArrayList<Operation>(values.size());
+    for (OperationWithFutureResult op : values)
+    {
+      results.add(op.operation);
+    }
+    return results;
   }
 
   /** {@inheritDoc} */
   @Override
   public Operation getOperationInProgress(int messageID)
   {
-    // TODO Auto-generated method stub
+    OperationWithFutureResult op = operationsInProgress.get(messageID);
+    if (op != null)
+    {
+      return op.operation;
+    }
     return null;
   }
 
+  /**
+   * Adds the passed in operation to the in progress list along with the
+   * associated future.
+   *
+   * @param operation
+   *          the operation to add to the in progress list
+   * @param futureResult
+   *          the future associated to the operation.
+   * @throws DirectoryException
+   *           If an error occurs
+   */
+  void addOperationInProgress(Operation operation,
+      AsynchronousFutureResult<Result, SearchResultHandler> futureResult)
+      throws DirectoryException
+  {
+    synchronized (opsInProgressLock)
+    {
+      // If we're already in the process of disconnecting the client,
+      // then reject the operation.
+      if (disconnectRequested)
+      {
+        Message message = WARN_CLIENT_DISCONNECT_IN_PROGRESS.get();
+        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
+      }
+
+      operationsInProgress.put(operation.getMessageID(),
+          new OperationWithFutureResult(operation, futureResult));
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
   public boolean removeOperationInProgress(int messageID)
   {
-    // TODO Auto-generated method stub
-    return false;
+    final OperationWithFutureResult previousValue =
+        operationsInProgress.remove(messageID);
+    if (previousValue != null)
+    {
+      operationsPerformed.incrementAndGet();
+    }
+    return previousValue != null;
   }
 
   /** {@inheritDoc} */
@@ -245,15 +438,51 @@
   public CancelResult cancelOperation(int messageID,
       CancelRequest cancelRequest)
   {
-    // TODO Auto-generated method stub
-    return null;
+    OperationWithFutureResult op = operationsInProgress.remove(messageID);
+    if (op != null)
+    {
+      op.futureResult.handleErrorResult(ErrorResultException
+          .newErrorResult(org.forgerock.opendj.ldap.ResultCode.CANCELLED));
+      return op.operation.cancel(cancelRequest);
+    }
+    return new CancelResult(ResultCode.NO_SUCH_OPERATION, null);
   }
 
   /** {@inheritDoc} */
   @Override
   public void cancelAllOperations(CancelRequest cancelRequest)
   {
-    // TODO Auto-generated method stub
+    synchronized (opsInProgressLock)
+    {
+      try
+      {
+        for (OperationWithFutureResult op : operationsInProgress.values())
+        {
+          try
+          {
+            op.futureResult.handleErrorResult(ErrorResultException
+               .newErrorResult(org.forgerock.opendj.ldap.ResultCode.CANCELLED));
+            op.operation.abort(cancelRequest);
+          }
+          catch (Exception e)
+          { // make sure all operations are cancelled, no mattter what
+            if (debugEnabled())
+            {
+              TRACER.debugCaught(DebugLogLevel.ERROR, e);
+            }
+          }
+        }
+
+        operationsInProgress.clear();
+      }
+      catch (Exception e)
+      { // TODO JNR should I keep this catch?
+        if (debugEnabled())
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
+      }
+    }
   }
 
   /** {@inheritDoc} */
@@ -261,15 +490,25 @@
   public void cancelAllOperationsExcept(CancelRequest cancelRequest,
       int messageID)
   {
-    // TODO Auto-generated method stub
+    synchronized (opsInProgressLock)
+    {
+      OperationWithFutureResult toKeep = operationsInProgress.remove(messageID);
+      try
+      {
+        cancelAllOperations(cancelRequest);
+      }
+      finally
+      { // Ensure we always put back this operation
+        operationsInProgress.put(messageID, toKeep);
+      }
+    }
   }
 
   /** {@inheritDoc} */
   @Override
   public long getNumberOfOperations()
   {
-    // TODO Auto-generated method stub
-    return 0;
+    return this.operationsPerformed.get();
   }
 
   /** {@inheritDoc} */
@@ -310,19 +549,9 @@
 
   /** {@inheritDoc} */
   @Override
-  public boolean prepareTLS(MessageBuilder unavailableReason)
-  {
-    // TODO JNR add message to mention that this client connection cannot start
-    // TLS
-    unavailableReason.append(INFO_HTTP_CONNHANDLER_STARTTLS_NOT_SUPPORTED);
-    return false;
-  }
-
-  /** {@inheritDoc} */
-  @Override
   public int getSSF()
   {
-    Object attribute = request.getAttribute("javax.servlet.request.key_size");
+    Object attribute = request.getAttribute(SERVLET_SSF_CONSTANT);
     if (attribute instanceof Number)
     {
       return ((Number) attribute).intValue();
@@ -333,12 +562,25 @@
       {
         return Integer.parseInt((String) attribute);
       }
-      catch (IllegalArgumentException e)
+      catch (IllegalArgumentException ignored)
       {
-        // TODO tracer debug
+        // We cannot do much about it. Just log it.
+        if (debugEnabled())
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
+        }
       }
     }
     return 0;
   }
 
+  /**
+   * Returns whether the client connection is valid.
+   *
+   * @return true if the connection is valid, false otherwise
+   */
+  boolean isConnectionValid()
+  {
+    return connectionValid;
+  }
 }

--
Gitblit v1.10.0