/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2010 Sun Microsystems, Inc.
*/
package org.opends.sdk.ldif;
import java.io.InterruptedIOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.opends.sdk.*;
import org.opends.sdk.requests.SearchRequest;
import org.opends.sdk.responses.*;
import com.sun.opends.sdk.util.Validator;
/**
* A {@code ConnectionEntryReader} is a bridge from
* {@code AsynchronousConnection}s to {@code EntryReader}s. A connection entry
* reader allows applications to iterate over search results as they are
* returned from the server during a search operation.
*
* The Search operation is performed synchronously, blocking until a search
* result entry is received. If a search result indicates that the search
* operation has failed for some reason then the error result is propagated to
* the caller using an {@code ErrorResultIOException}. If a search result
* reference is returned then it is propagated to the caller using a
* {@code SearchResultReferenceIOException}.
*
* The following code illustrates how a {@code ConnectionEntryReader} may be
* used:
*
*
* Connection connection = ...;
* ConnectionEntryReader results = connection.search(
* "dc=example,dc=com",
* SearchScope.WHOLE_SUBTREE,
* "(objectClass=person)");
* SearchResultEntry entry;
* try
* {
* while ((entry = results.readEntry()) != null)
* {
* // Process search result entry.
* }
* }
* catch (Exception e)
* {
* // Handle exceptions
* }
* finally
* {
* results.close();
* }
*
*/
public final class ConnectionEntryReader implements EntryReader
{
/**
* Result handler that places all responses in a queue.
*/
private final static class BufferHandler implements SearchResultHandler
{
private final BlockingQueue responses;
private volatile boolean isInterrupted = false;
private BufferHandler(final BlockingQueue responses)
{
this.responses = responses;
}
@Override
public boolean handleEntry(final SearchResultEntry entry)
{
try
{
responses.put(entry);
return true;
}
catch (final InterruptedException e)
{
// Prevent the reader from waiting for a result that will never arrive.
isInterrupted = true;
Thread.currentThread().interrupt();
return false;
}
}
@Override
public void handleErrorResult(final ErrorResultException error)
{
try
{
responses.put(error.getResult());
}
catch (final InterruptedException e)
{
// Prevent the reader from waiting for a result that will never arrive.
isInterrupted = true;
Thread.currentThread().interrupt();
}
}
@Override
public boolean handleReference(final SearchResultReference reference)
{
try
{
responses.put(reference);
return true;
}
catch (final InterruptedException e)
{
// Prevent the reader from waiting for a result that will never arrive.
isInterrupted = true;
Thread.currentThread().interrupt();
return false;
}
}
@Override
public void handleResult(final Result result)
{
try
{
responses.put(result);
}
catch (final InterruptedException e)
{
// Prevent the reader from waiting for a result that will never arrive.
isInterrupted = true;
Thread.currentThread().interrupt();
}
}
}
private final BufferHandler buffer;
private final FutureResult future;
/**
* Creates a new connection entry reader whose destination is the provided
* connection using an unbounded {@code LinkedBlockingQueue}.
*
* @param connection
* The connection to use.
* @param searchRequest
* The search request to retrieve entries with.
* @throws NullPointerException
* If {@code connection} was {@code null}.
*/
public ConnectionEntryReader(final AsynchronousConnection connection,
final SearchRequest searchRequest) throws NullPointerException
{
this(connection, searchRequest, new LinkedBlockingQueue());
}
/**
* Creates a new connection entry reader whose destination is the provided
* connection.
*
* @param connection
* The connection to use.
* @param searchRequest
* The search request to retrieve entries with.
* @param entries
* The {@code BlockingQueue} implementation to use when queuing the
* returned entries.
* @throws NullPointerException
* If {@code connection} was {@code null}.
*/
public ConnectionEntryReader(final AsynchronousConnection connection,
final SearchRequest searchRequest, final BlockingQueue entries)
throws NullPointerException
{
Validator.ensureNotNull(connection);
buffer = new BufferHandler(entries);
future = connection.search(searchRequest, buffer);
}
/**
* Closes this connection entry reader, cancelling the search request if it is
* still active.
*/
@Override
public void close()
{
// Cancel the search if it is still running.
future.cancel(true);
}
/**
* Returns the next search result entry contained in the search results,
* waiting if necessary until one becomes available.
*
* @return The next search result entry, or {@code null} if there are no more
* entries in the search results.
* @throws SearchResultReferenceIOException
* If the next search response was a search result reference. This
* connection entry reader may still contain remaining search
* results and references which can be retrieved using additional
* calls to this method.
* @throws ErrorResultIOException
* If the result code indicates that the search operation failed for
* some reason.
* @throws InterruptedIOException
* If the current thread was interrupted while waiting.
*/
@Override
public SearchResultEntry readEntry() throws SearchResultReferenceIOException,
ErrorResultIOException, InterruptedIOException
{
Response r;
try
{
while ((r = buffer.responses.poll(50, TimeUnit.MILLISECONDS)) == null)
{
if (buffer.isInterrupted)
{
// The worker thread processing the result was interrupted so no
// result will ever arrive. We don't want to hang this thread forever
// while we wait, so terminate now.
r = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR);
break;
}
}
}
catch (final InterruptedException e)
{
throw new InterruptedIOException(e.getMessage());
}
if (r instanceof SearchResultEntry)
{
return (SearchResultEntry) r;
}
else if (r instanceof SearchResultReference)
{
throw new SearchResultReferenceIOException((SearchResultReference) r);
}
else if (r instanceof Result)
{
final Result result = (Result) r;
if (result.isSuccess())
{
return null;
}
else
{
throw new ErrorResultIOException(ErrorResultException.wrap(result));
}
}
else
{
throw new RuntimeException("Unexpected response type: "
+ r.getClass().toString());
}
}
}