From 818af554fe494570725f20a03eb04880fe313406 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 08 Mar 2013 14:53:47 +0000
Subject: [PATCH] Partial fix for OPENDJ-699: Implement DN reference mapping
---
opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Context.java | 335 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 333 insertions(+), 2 deletions(-)
diff --git a/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Context.java b/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Context.java
index 2a65484..d57a832 100644
--- a/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Context.java
+++ b/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Context.java
@@ -15,18 +15,183 @@
*/
package org.forgerock.opendj.rest2ldap;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+
import java.io.Closeable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.json.resource.ServerContext;
+import org.forgerock.opendj.ldap.AbstractAsynchronousConnection;
import org.forgerock.opendj.ldap.Connection;
+import org.forgerock.opendj.ldap.ConnectionEventListener;
+import org.forgerock.opendj.ldap.DN;
+import org.forgerock.opendj.ldap.ErrorResultException;
+import org.forgerock.opendj.ldap.FutureResult;
+import org.forgerock.opendj.ldap.IntermediateResponseHandler;
+import org.forgerock.opendj.ldap.ResultHandler;
+import org.forgerock.opendj.ldap.SearchResultHandler;
+import org.forgerock.opendj.ldap.SearchScope;
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.opendj.ldap.responses.SearchResultEntry;
+import org.forgerock.opendj.ldap.responses.SearchResultReference;
/**
- * Common context information passed to containers and mappers.
+ * Common context information passed to containers and mappers. A new context is
+ * allocated for each REST request.
*/
final class Context implements Closeable {
+
+ /*
+ * A cached read request - see cachedReads for more information.
+ */
+ private static final class CachedRead implements SearchResultHandler {
+ private SearchResultEntry cachedEntry;
+ private final String cachedFilterString;
+ private FutureResult<Result> cachedFuture; // Guarded by latch.
+ private final CountDownLatch cachedFutureLatch = new CountDownLatch(1);
+ private final SearchRequest cachedRequest;
+ private volatile Result cachedResult;
+ private final ConcurrentLinkedQueue<SearchResultHandler> waitingResultHandlers =
+ new ConcurrentLinkedQueue<SearchResultHandler>();
+
+ CachedRead(final SearchRequest request, final SearchResultHandler resultHandler) {
+ this.cachedRequest = request;
+ this.cachedFilterString = request.getFilter().toString();
+ this.waitingResultHandlers.add(resultHandler);
+ }
+
+ @Override
+ public boolean handleEntry(final SearchResultEntry entry) {
+ cachedEntry = entry;
+ return true;
+ }
+
+ @Override
+ public void handleErrorResult(final ErrorResultException error) {
+ handleResult(error.getResult());
+ }
+
+ @Override
+ public boolean handleReference(final SearchResultReference reference) {
+ // Ignore - should never happen for a base object search.
+ return true;
+ }
+
+ @Override
+ public void handleResult(final Result result) {
+ cachedResult = result;
+ drainQueue();
+ }
+
+ void addResultHandler(final SearchResultHandler resultHandler) {
+ // Fast path.
+ if (cachedResult != null) {
+ invokeResultHandler(resultHandler);
+ }
+ // Enqueue and re-check.
+ waitingResultHandlers.add(resultHandler);
+ if (cachedResult != null) {
+ drainQueue();
+ }
+ }
+
+ FutureResult<Result> getFutureResult() {
+ // Perform uninterrupted wait since this method is unlikely to block for a long time.
+ boolean wasInterrupted = false;
+ while (true) {
+ try {
+ cachedFutureLatch.await();
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ return cachedFuture;
+ } catch (final InterruptedException e) {
+ wasInterrupted = true;
+ }
+ }
+ }
+
+ boolean isMatchingRead(final SearchRequest request) {
+ // Cached reads are always base object.
+ if (!request.getScope().equals(SearchScope.BASE_OBJECT)) {
+ return false;
+ }
+
+ // Filters must match.
+ if (!request.getFilter().toString().equals(cachedFilterString)) {
+ return false;
+ }
+
+ // List of requested attributes must match.
+ if (!request.getAttributes().equals(cachedRequest.getAttributes())) {
+ return false;
+ }
+
+ // Don't need to check anything else.
+ return true;
+ }
+
+ void setFuture(final FutureResult<Result> future) {
+ cachedFuture = future;
+ cachedFutureLatch.countDown();
+ }
+
+ private void drainQueue() {
+ SearchResultHandler resultHandler;
+ while ((resultHandler = waitingResultHandlers.poll()) != null) {
+ invokeResultHandler(resultHandler);
+ }
+ }
+
+ private void invokeResultHandler(final SearchResultHandler resultHandler) {
+ if (cachedEntry != null) {
+ resultHandler.handleEntry(cachedEntry);
+ }
+ if (cachedResult.isSuccess()) {
+ resultHandler.handleResult(cachedResult);
+ } else {
+ resultHandler.handleErrorResult(newErrorResult(cachedResult));
+ }
+ }
+
+ }
+
+ /*
+ * An LRU cache of recent reads requests. This is used in order to reduce
+ * the number of repeated read operations performed when resolving DN
+ * references.
+ */
+ @SuppressWarnings("serial")
+ private final Map<DN, CachedRead> cachedReads = new LinkedHashMap<DN, CachedRead>() {
+ private static final int MAX_CACHED_ENTRIES = 32;
+
+ @Override
+ protected boolean removeEldestEntry(final Map.Entry<DN, CachedRead> eldest) {
+ return size() > MAX_CACHED_ENTRIES;
+ }
+ };
+
private final Config config;
+
private final AtomicReference<Connection> connection = new AtomicReference<Connection>();
+
private final ServerContext context;
Context(final Config config, final ServerContext context) {
@@ -58,9 +223,175 @@
}
void setConnection(final Connection connection) {
- if (!this.connection.compareAndSet(null, connection)) {
+ if (!this.connection.compareAndSet(null, withCache(connection))) {
throw new IllegalStateException("LDAP connection obtained multiple times");
}
}
+ /*
+ * Adds read caching support to the provided connection.
+ */
+ private Connection withCache(final Connection connection) {
+ /*
+ * We only use async methods so no need to wrap sync methods.
+ */
+ return new AbstractAsynchronousConnection() {
+
+ @Override
+ public FutureResult<Void> abandonAsync(final AbandonRequest request) {
+ return connection.abandonAsync(request);
+ }
+
+ @Override
+ public FutureResult<Result> addAsync(final AddRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ return connection.addAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public void addConnectionEventListener(final ConnectionEventListener listener) {
+ connection.addConnectionEventListener(listener);
+ }
+
+ @Override
+ public FutureResult<BindResult> bindAsync(final BindRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super BindResult> resultHandler) {
+ /*
+ * Simple brute force implementation in case the bind operation
+ * modifies an entry: clear the cachedReads.
+ */
+ evictAll();
+ return connection.bindAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public void close(final UnbindRequest request, final String reason) {
+ connection.close(request, reason);
+ }
+
+ @Override
+ public FutureResult<CompareResult> compareAsync(final CompareRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super CompareResult> resultHandler) {
+ return connection.compareAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public FutureResult<Result> deleteAsync(final DeleteRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ evict(request.getName());
+ return connection.deleteAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
+ final ExtendedRequest<R> request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super R> resultHandler) {
+ /*
+ * Simple brute force implementation in case the extended
+ * operation modifies an entry: clear the cachedReads.
+ */
+ evictAll();
+ return connection.extendedRequestAsync(request, intermediateResponseHandler,
+ resultHandler);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return connection.isClosed();
+ }
+
+ @Override
+ public boolean isValid() {
+ return connection.isValid();
+ }
+
+ @Override
+ public FutureResult<Result> modifyAsync(final ModifyRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ evict(request.getName());
+ return connection.modifyAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ // Simple brute force implementation: clear the cachedReads.
+ evictAll();
+ return connection
+ .modifyDNAsync(request, intermediateResponseHandler, resultHandler);
+ }
+
+ @Override
+ public void removeConnectionEventListener(final ConnectionEventListener listener) {
+ connection.removeConnectionEventListener(listener);
+ }
+
+ /*
+ * Try and re-use a cached result if possible.
+ */
+ @Override
+ public FutureResult<Result> searchAsync(final SearchRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final SearchResultHandler resultHandler) {
+
+ /*
+ * Don't attempt caching if this search is not a read (base
+ * object), or if the search request passed in an intermediate
+ * response handler.
+ */
+ if (!request.getScope().equals(SearchScope.BASE_OBJECT)
+ || intermediateResponseHandler != null) {
+ return connection.searchAsync(request, intermediateResponseHandler,
+ resultHandler);
+ }
+
+ // This is a read request and a candidate for caching.
+ final CachedRead cachedRead;
+ synchronized (cachedReads) {
+ cachedRead = cachedReads.get(request.getName());
+ }
+ if (cachedRead != null && cachedRead.isMatchingRead(request)) {
+ // The cached read matches this read request.
+ cachedRead.addResultHandler(resultHandler);
+ return cachedRead.getFutureResult();
+ } else {
+ // Cache the read, possibly evicting a non-matching cached read.
+ final CachedRead pendingCachedRead = new CachedRead(request, resultHandler);
+ synchronized (cachedReads) {
+ cachedReads.put(request.getName(), pendingCachedRead);
+ }
+ final FutureResult<Result> future =
+ connection.searchAsync(request, intermediateResponseHandler,
+ pendingCachedRead);
+ pendingCachedRead.setFuture(future);
+ return future;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return connection.toString();
+ }
+
+ private void evict(final DN name) {
+ synchronized (cachedReads) {
+ cachedReads.remove(name);
+ }
+ }
+
+ private void evictAll() {
+ synchronized (cachedReads) {
+ cachedReads.clear();
+ }
+ }
+ };
+ }
+
}
--
Gitblit v1.10.0