| | |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.forgerock.json.fluent.JsonPointer; |
| | | import org.forgerock.json.fluent.JsonValue; |
| | |
| | | ldapFilter == Filter.alwaysTrue() ? Filter |
| | | .objectClassPresent() : ldapFilter, attributes); |
| | | c.getConnection().searchAsync(request, null, new SearchResultHandler() { |
| | | private final AtomicInteger pendingResourceCount = |
| | | new AtomicInteger(); |
| | | private final AtomicReference<ResourceException> pendingResult = |
| | | new AtomicReference<ResourceException>(); |
| | | private final AtomicBoolean resultSent = new AtomicBoolean(); |
| | | /* |
| | | * The following fields are guarded by |
| | | * sequenceLock. In addition, the sequenceLock |
| | | * ensures that we send one JSON resource at a |
| | | * time back to the client. |
| | | */ |
| | | private final Object sequenceLock = new Object(); |
| | | private int pendingResourceCount = 0; |
| | | private ResourceException pendingResult = null; |
| | | private boolean resultSent = false; |
| | | |
| | | @Override |
| | | public boolean handleEntry(final SearchResultEntry entry) { |
| | |
| | | * non-null is if a mapping error has |
| | | * occurred. |
| | | */ |
| | | if (pendingResult.get() != null) { |
| | | return false; |
| | | synchronized (sequenceLock) { |
| | | if (pendingResult != null) { |
| | | return false; |
| | | } |
| | | pendingResourceCount++; |
| | | } |
| | | |
| | | /* |
| | | * FIXME: secondary asynchronous searches |
| | | * will complete in a non-deterministic |
| | | * order and may cause the JSON resources to |
| | | * be returned in a different order to the |
| | | * order in which the primary LDAP search |
| | | * results were received. This is benign at |
| | | * the moment, but will need resolving when |
| | | * we implement server side sorting. A |
| | | * possible fix will be to use a queue of |
| | | * pending resources (futures?). However, |
| | | * the queue cannot be unbounded in case it |
| | | * grows very large, but it cannot be |
| | | * bounded either since that could cause a |
| | | * deadlock between rest2ldap and the LDAP |
| | | * server (imagine the case where the server |
| | | * has a single worker thread which is |
| | | * occupied processing the primary search). |
| | | * The best solution is probably to process |
| | | * the primary search results in batches |
| | | * using the paged results control. |
| | | */ |
| | | final String id = nameStrategy.getResourceId(c, entry); |
| | | final String revision = getRevisionFromEntry(entry); |
| | | final ResultHandler<JsonValue> mapHandler = |
| | | attributeMapper.read(c, new JsonPointer(), entry, |
| | | new ResultHandler<JsonValue>() { |
| | | @Override |
| | | public void handleError(final ResourceException e) { |
| | | pendingResult.compareAndSet(null, e); |
| | | pendingResourceCount.decrementAndGet(); |
| | | completeIfNecessary(); |
| | | synchronized (sequenceLock) { |
| | | pendingResourceCount--; |
| | | completeIfNecessary(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void handleResult(final JsonValue result) { |
| | | final Resource resource = |
| | | new Resource(id, revision, result); |
| | | h.handleResource(resource); |
| | | pendingResourceCount.decrementAndGet(); |
| | | completeIfNecessary(); |
| | | synchronized (sequenceLock) { |
| | | pendingResourceCount--; |
| | | if (!resultSent) { |
| | | h.handleResource(new Resource(id, |
| | | revision, result)); |
| | | } |
| | | completeIfNecessary(); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | pendingResourceCount.incrementAndGet(); |
| | | attributeMapper.read(c, new JsonPointer(), entry, mapHandler); |
| | | }); |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | pendingResult.compareAndSet(null, asResourceException(error)); |
| | | completeIfNecessary(); |
| | | synchronized (sequenceLock) { |
| | | completeIfNecessary(asResourceException(error)); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | |
| | | |
| | | @Override |
| | | public void handleResult(final Result result) { |
| | | pendingResult.compareAndSet(null, SUCCESS); |
| | | synchronized (sequenceLock) { |
| | | completeIfNecessary(SUCCESS); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * This method must be invoked with the |
| | | * sequenceLock held. |
| | | */ |
| | | private void completeIfNecessary(final ResourceException e) { |
| | | if (pendingResult == null) { |
| | | pendingResult = e; |
| | | } |
| | | completeIfNecessary(); |
| | | } |
| | | |
| | | /* |
| | | * Close out the query result set if there are |
| | | * no more pending resources and the LDAP result |
| | | * has been received. |
| | | * has been received. This method must be |
| | | * invoked with the sequenceLock held. |
| | | */ |
| | | private void completeIfNecessary() { |
| | | if (pendingResourceCount.get() == 0) { |
| | | final ResourceException result = pendingResult.get(); |
| | | if (result != null && resultSent.compareAndSet(false, true)) { |
| | | if (result == SUCCESS) { |
| | | h.handleResult(new QueryResult()); |
| | | } else { |
| | | h.handleError(result); |
| | | } |
| | | if (pendingResourceCount == 0 && pendingResult != null |
| | | && !resultSent) { |
| | | if (pendingResult == SUCCESS) { |
| | | h.handleResult(new QueryResult()); |
| | | } else { |
| | | h.handleError(pendingResult); |
| | | } |
| | | resultSent = true; |
| | | } |
| | | } |
| | | }); |