/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* or http://forgerock.org/license/CDDLv1.0.html.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2010 Sun Microsystems, Inc.
* Portions copyright 2011-2012 ForgeRock AS.
*/
package org.forgerock.opendj.ldif;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.forgerock.opendj.ldap.Connection;
import org.forgerock.opendj.ldap.ErrorResultException;
import org.forgerock.opendj.ldap.ErrorResultIOException;
import org.forgerock.opendj.ldap.FutureResult;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.SearchResultHandler;
import org.forgerock.opendj.ldap.SearchResultReferenceIOException;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
import com.forgerock.opendj.util.Validator;
/**
* A {@code ConnectionEntryReader} is a bridge from {@code Connection}s to
* {@code EntryReader}s. A connection entry reader allows applications to
* iterate over search results as they are returned from the server during a
* search operation.
*
* The Search operation is performed synchronously, blocking until a search
* result entry is received. If a search result indicates that the search
* operation has failed for some reason then the error result is propagated to
* the caller using an {@code ErrorResultIOException}. If a search result
* reference is returned then it is propagated to the caller using a
* {@code SearchResultReferenceIOException}.
*
* The following code illustrates how a {@code ConnectionEntryReader} may be
* used:
*
*
* Connection connection = ...;
* ConnectionEntryReader reader = connection.search("dc=example,dc=com",
* SearchScope.WHOLE_SUBTREE, "(objectClass=person)");
* try
* {
* while (reader.hasNext())
* {
* if (reader.isEntry())
* {
* SearchResultEntry entry = reader.readEntry();
*
* // Handle entry...
* }
* else
* {
* SearchResultReference ref = reader.readReference();
*
* // Handle continuation reference...
* }
* }
* }
* catch (IOException e)
* {
* // Handle exceptions...
* }
* finally
* {
* reader.close();
* }
*
*/
public final class ConnectionEntryReader implements EntryReader {
/**
* Result handler that places all responses in a queue.
*/
private final static class BufferHandler implements SearchResultHandler {
private final BlockingQueue responses;
private volatile boolean isInterrupted = false;
private BufferHandler(final BlockingQueue responses) {
this.responses = responses;
}
@Override
public boolean handleEntry(final SearchResultEntry entry) {
try {
responses.put(entry);
return true;
} catch (final InterruptedException e) {
// Prevent the reader from waiting for a result that will never
// arrive.
isInterrupted = true;
Thread.currentThread().interrupt();
return false;
}
}
@Override
public void handleErrorResult(final ErrorResultException error) {
try {
responses.put(error.getResult());
} catch (final InterruptedException e) {
// Prevent the reader from waiting for a result that will never
// arrive.
isInterrupted = true;
Thread.currentThread().interrupt();
}
}
@Override
public boolean handleReference(final SearchResultReference reference) {
try {
responses.put(reference);
return true;
} catch (final InterruptedException e) {
// Prevent the reader from waiting for a result that will never
// arrive.
isInterrupted = true;
Thread.currentThread().interrupt();
return false;
}
}
@Override
public void handleResult(final Result result) {
try {
responses.put(result);
} catch (final InterruptedException e) {
// Prevent the reader from waiting for a result that will never
// arrive.
isInterrupted = true;
Thread.currentThread().interrupt();
}
}
}
private final BufferHandler buffer;
private final FutureResult future;
private Response nextResponse = null;
/**
* Creates a new connection entry reader whose destination is the provided
* connection using an unbounded {@code LinkedBlockingQueue}.
*
* @param connection
* The connection to use.
* @param searchRequest
* The search request to retrieve entries with.
* @throws NullPointerException
* If {@code connection} was {@code null}.
*/
public ConnectionEntryReader(final Connection connection, final SearchRequest searchRequest) {
this(connection, searchRequest, new LinkedBlockingQueue());
}
/**
* Creates a new connection entry reader whose destination is the provided
* connection.
*
* @param connection
* The connection to use.
* @param searchRequest
* The search request to retrieve entries with.
* @param entries
* The {@code BlockingQueue} implementation to use when queuing
* the returned entries.
* @throws NullPointerException
* If {@code connection} was {@code null}.
*/
public ConnectionEntryReader(final Connection connection, final SearchRequest searchRequest,
final BlockingQueue entries) {
Validator.ensureNotNull(connection);
buffer = new BufferHandler(entries);
future = connection.searchAsync(searchRequest, null, buffer);
}
/**
* Closes this connection entry reader, canceling the search request if it
* is still active.
*/
@Override
public void close() {
// Cancel the search if it is still running.
future.cancel(true);
}
/**
* {@inheritDoc}
*/
@Override
public boolean hasNext() throws ErrorResultIOException {
// Poll for the next response if needed.
final Response r = getNextResponse();
if (!(r instanceof Result)) {
// Entry or reference.
return true;
}
// Final result.
final Result result = (Result) r;
if (result.isSuccess()) {
return false;
}
throw new ErrorResultIOException(newErrorResult(result));
}
/**
* Waits for the next search result entry or reference to become available
* and returns {@code true} if it is an entry, or {@code false} if it is a
* reference.
*
* @return {@code true} if the next search result is an entry, or
* {@code false} if it is a reference.
* @throws ErrorResultIOException
* If there are no more search result entries or references and
* the search result code indicates that the search operation
* failed for some reason.
* @throws NoSuchElementException
* If there are no more search result entries or references and
* the search result code indicates that the search operation
* succeeded.
*/
public boolean isEntry() throws ErrorResultIOException {
// Throws ErrorResultIOException if search returned error.
if (!hasNext()) {
// Search has completed successfully.
throw new NoSuchElementException();
}
// Entry or reference?
final Response r = nextResponse;
if (r instanceof SearchResultEntry) {
return true;
} else if (r instanceof SearchResultReference) {
return false;
} else {
throw new RuntimeException("Unexpected response type: " + r.getClass().toString());
}
}
/**
* Waits for the next search result entry or reference to become available
* and returns {@code true} if it is a reference, or {@code false} if it is
* an entry.
*
* @return {@code true} if the next search result is a reference, or
* {@code false} if it is an entry.
* @throws ErrorResultIOException
* If there are no more search result entries or references and
* the search result code indicates that the search operation
* failed for some reason.
* @throws NoSuchElementException
* If there are no more search result entries or references and
* the search result code indicates that the search operation
* succeeded.
*/
public boolean isReference() throws ErrorResultIOException {
return !isEntry();
}
/**
* Waits for the next search result entry or reference to become available
* and, if it is an entry, returns it as a {@code SearchResultEntry}. If the
* next search response is a reference then this method will throw a
* {@code SearchResultReferenceIOException}.
*
* @return The next search result entry.
* @throws SearchResultReferenceIOException
* If the next search response was a search result reference.
* This connection entry reader may still contain remaining
* search results and references which can be retrieved using
* additional calls to this method.
* @throws ErrorResultIOException
* If there are no more search result entries or references and
* the search result code indicates that the search operation
* failed for some reason.
* @throws NoSuchElementException
* If there are no more search result entries or references and
* the search result code indicates that the search operation
* succeeded.
*/
@Override
public SearchResultEntry readEntry() throws SearchResultReferenceIOException,
ErrorResultIOException {
if (isEntry()) {
final SearchResultEntry entry = (SearchResultEntry) nextResponse;
nextResponse = null;
return entry;
} else {
final SearchResultReference reference = (SearchResultReference) nextResponse;
nextResponse = null;
throw new SearchResultReferenceIOException(reference);
}
}
/**
* Waits for the next search result entry or reference to become available
* and, if it is a reference, returns it as a {@code SearchResultReference}.
* If the next search response is an entry then this method will return
* {@code null}.
*
* @return The next search result reference, or {@code null} if the next
* response was a search result entry.
* @throws ErrorResultIOException
* If there are no more search result entries or references and
* the search result code indicates that the search operation
* failed for some reason.
* @throws NoSuchElementException
* If there are no more search result entries or references and
* the search result code indicates that the search operation
* succeeded.
*/
public SearchResultReference readReference() throws ErrorResultIOException {
if (isReference()) {
final SearchResultReference reference = (SearchResultReference) nextResponse;
nextResponse = null;
return reference;
} else {
return null;
}
}
private Response getNextResponse() throws ErrorResultIOException {
while (nextResponse == null) {
try {
nextResponse = buffer.responses.poll(50, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
final ErrorResultException ere =
newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
throw new ErrorResultIOException(ere);
}
if (nextResponse == null && buffer.isInterrupted) {
// The worker thread processing the result was interrupted so no
// result will ever arrive. We don't want to hang this thread
// forever while we wait, so terminate now.
nextResponse = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR);
break;
}
}
return nextResponse;
}
}