From 7983705a0ce4de2fd6e7a4061fe4afa0f9e8c66a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 12 Sep 2012 20:46:39 +0000
Subject: [PATCH] First part of fix for OPENDJ-590: ConnectionPool may return already closed/disconnected connections
---
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java | 763 ++++++++++++++++++++++++++++------------------------------
1 files changed, 364 insertions(+), 399 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index 5cbe12c..9d6fb88a 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -31,9 +31,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -83,22 +83,22 @@
/**
* LDAP connection implementation.
- * <p>
- * TODO: handle illegal state exceptions.
*/
final class LDAPConnection extends AbstractAsynchronousConnection implements Connection {
- private final org.glassfish.grizzly.Connection<?> connection;
- private Result connectionInvalidReason;
- private boolean isClosed = false;
- private final List<ConnectionEventListener> listeners =
- new CopyOnWriteArrayList<ConnectionEventListener>();
- private final AtomicInteger nextMsgID = new AtomicInteger(1);
private final AtomicBoolean bindOrStartTLSInProgress = new AtomicBoolean(false);
+ private final org.glassfish.grizzly.Connection<?> connection;
+ private final LDAPWriter ldapWriter = new LDAPWriter();
+ private final AtomicInteger nextMsgID = new AtomicInteger(1);
+ private final LDAPOptions options;
private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests =
new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>();
private final Object stateLock = new Object();
- private final LDAPWriter ldapWriter = new LDAPWriter();
- private final LDAPOptions options;
+ // Guarded by stateLock
+ private Result connectionInvalidReason;
+ private boolean failedDueToDisconnect = false;
+ private boolean isClosed = false;
+ private boolean isFailed = false;
+ private List<ConnectionEventListener> listeners = null;
/**
* Creates a new LDAP connection.
@@ -116,60 +116,47 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Void> abandonAsync(final AbandonRequest request) {
final AbstractLDAPFutureResultImpl<?> pendingRequest;
final int messageID = nextMsgID.getAndIncrement();
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- return new CompletedFutureResult<Void>(newErrorResult(connectionInvalidReason),
- messageID);
- }
- if (bindOrStartTLSInProgress.get()) {
- final Result errorResult =
- Responses.newResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
- "Bind or Start TLS operation in progress");
- return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID);
- }
-
- // First remove the future associated with the request to be
- // abandoned.
- pendingRequest = pendingRequests.remove(request.getRequestID());
- }
-
- if (pendingRequest == null) {
- // There has never been a request with the specified message ID or
- // the response has already been received and handled. We can ignore
- // this abandon request.
-
- // Message ID will be -1 since no request was sent.
- return new CompletedFutureResult<Void>((Void) null);
- }
-
- pendingRequest.cancel(false);
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.abandonRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- return new CompletedFutureResult<Void>((Void) null, messageID);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ // Remove the future associated with the request to be abandoned.
+ pendingRequest = pendingRequests.remove(request.getRequestID());
}
- } catch (final IOException e) {
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID);
+ if (pendingRequest == null) {
+ // There has never been a request with the specified message ID or
+ // the response has already been received and handled. We can ignore
+ // this abandon request.
+
+ // Message ID will be -1 since no request was sent.
+ return new CompletedFutureResult<Void>((Void) null);
+ }
+ pendingRequest.cancel(false);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.abandonRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ return new CompletedFutureResult<Void>((Void) null, messageID);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ return new CompletedFutureResult<Void>(e, messageID);
}
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Result> addAsync(final AddRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
@@ -177,59 +164,67 @@
final LDAPFutureResultImpl future =
new LDAPFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.addRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.addRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public void addConnectionEventListener(final ConnectionEventListener listener) {
Validator.ensureNotNull(listener);
- listeners.add(listener);
+ final boolean notifyClose;
+ final boolean notifyErrorOccurred;
+ synchronized (stateLock) {
+ notifyClose = isClosed;
+ notifyErrorOccurred = isFailed;
+ if (!isClosed) {
+ if (listeners == null) {
+ listeners = new LinkedList<ConnectionEventListener>();
+ }
+ listeners.add(listener);
+ }
+ }
+ if (notifyErrorOccurred) {
+ // Use the reason provided in the disconnect notification.
+ listener.handleConnectionError(failedDueToDisconnect,
+ newErrorResult(connectionInvalidReason));
+ }
+ if (notifyClose) {
+ listener.handleConnectionClosed();
+ }
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<BindResult> bindAsync(final BindRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super BindResult> resultHandler) {
final int messageID = nextMsgID.getAndIncrement();
-
- BindClient context;
+ final BindClient context;
try {
context =
request.createBindClient(
@@ -253,49 +248,41 @@
new LDAPBindFutureResultImpl(messageID, context, resultHandler,
intermediateResponseHandler, this);
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (!pendingRequests.isEmpty()) {
- future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage(
- "There are other operations pending on this connection"));
- return future;
- }
-
- if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
- future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
-
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- // Use the bind client to get the initial request instead of
- // using the bind request passed to this method.
- final GenericBindRequest initialRequest = context.nextBindRequest();
- ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ if (!pendingRequests.isEmpty()) {
+ future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
+ .setDiagnosticMessage(
+ "There are other operations pending on this connection"));
+ return future;
+ }
+ if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
+ future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
+ .setDiagnosticMessage("Bind or Start TLS operation in progress"));
+ return future;
+ }
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
- bindOrStartTLSInProgress.set(false);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ // Use the bind client to get the initial request instead of
+ // using the bind request passed to this method.
+ final GenericBindRequest initialRequest = context.nextBindRequest();
+ ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ bindOrStartTLSInProgress.set(false);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
return future;
@@ -304,6 +291,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void close(final UnbindRequest request, final String reason) {
// FIXME: I18N need to internationalize this message.
Validator.ensureNotNull(request);
@@ -316,6 +304,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<CompareResult> compareAsync(final CompareRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super CompareResult> resultHandler) {
@@ -323,45 +312,34 @@
final LDAPCompareFutureResultImpl future =
new LDAPCompareFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newCompareResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.compareRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.compareRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Result> deleteAsync(final DeleteRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
@@ -369,45 +347,34 @@
final LDAPFutureResultImpl future =
new LDAPFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.deleteRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.deleteRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
final ExtendedRequest<R> request,
final IntermediateResponseHandler intermediateResponseHandler,
@@ -416,68 +383,54 @@
final LDAPExtendedFutureResultImpl<R> future =
new LDAPExtendedFutureResultImpl<R>(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
- if (!pendingRequests.isEmpty()) {
- future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
- ResultCode.OPERATIONS_ERROR, "",
- "There are pending operations on this connection"));
- return future;
- }
- if (isTLSEnabled()) {
- future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
- ResultCode.OPERATIONS_ERROR, "",
- "This connection is already TLS enabled"));
- return future;
- }
- if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
- future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
- ResultCode.OPERATIONS_ERROR, "",
- "Bind or Start TLS operation in progress"));
- return future;
- }
- } else {
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
- ResultCode.OPERATIONS_ERROR, "",
- "Bind or Start TLS operation in progress"));
- return future;
- }
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.extendedRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
+ if (!pendingRequests.isEmpty()) {
+ future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
+ ResultCode.OPERATIONS_ERROR, "",
+ "There are pending operations on this connection"));
+ return future;
+ } else if (isTLSEnabled()) {
+ future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
+ ResultCode.OPERATIONS_ERROR, "",
+ "This connection is already TLS enabled"));
+ return future;
+ } else if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
+ future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
+ ResultCode.OPERATIONS_ERROR, "",
+ "Bind or Start TLS operation in progress"));
+ return future;
+ }
+ } else {
+ checkBindOrStartTLSInProgress();
+ }
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
- bindOrStartTLSInProgress.set(false);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.extendedRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ bindOrStartTLSInProgress.set(false);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public boolean isClosed() {
synchronized (stateLock) {
return isClosed;
@@ -487,15 +440,17 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isValid() {
synchronized (stateLock) {
- return connectionInvalidReason == null && !isClosed;
+ return isValid0();
}
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Result> modifyAsync(final ModifyRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
@@ -503,45 +458,34 @@
final LDAPFutureResultImpl future =
new LDAPFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.modifyRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.modifyRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
@@ -549,53 +493,47 @@
final LDAPFutureResultImpl future =
new LDAPFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.modifyDNRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.modifyDNRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public void removeConnectionEventListener(final ConnectionEventListener listener) {
Validator.ensureNotNull(listener);
- listeners.remove(listener);
+ synchronized (stateLock) {
+ if (listeners != null) {
+ listeners.remove(listener);
+ }
+ }
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Result> searchAsync(final SearchRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final SearchResultHandler resultHandler) {
@@ -603,47 +541,36 @@
final LDAPSearchFutureResultImpl future =
new LDAPSearchFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.searchRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.searchRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public String toString() {
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("LDAPConnection(");
builder.append(connection.getLocalAddress());
builder.append(',');
@@ -652,23 +579,11 @@
return builder.toString();
}
- int continuePendingBindRequest(final LDAPBindFutureResultImpl future)
- throws ErrorResultException {
- final int newMsgID = nextMsgID.getAndIncrement();
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- throw newErrorResult(connectionInvalidReason);
- }
- pendingRequests.put(newMsgID, future);
- }
- return newMsgID;
- }
-
long cancelExpiredRequests(final long currentTime) {
final long timeout = options.getTimeout(TimeUnit.MILLISECONDS);
long delay = timeout;
if (timeout > 0) {
- for (int requestID : pendingRequests.keySet()) {
+ for (final int requestID : pendingRequests.keySet()) {
final AbstractLDAPFutureResultImpl<?> future = pendingRequests.get(requestID);
if (future != null) {
final long diff = (future.getTimestamp() + timeout) - currentTime;
@@ -687,52 +602,53 @@
return delay;
}
+ /**
+ * Closes this connection, invoking event listeners as needed.
+ *
+ * @param unbindRequest
+ * The client provided unbind request if this is a client
+ * initiated close, or {@code null} if the connection has failed.
+ * @param isDisconnectNotification
+ * {@code true} if this is a connection failure signalled by a
+ * server disconnect notification.
+ * @param reason
+ * The result indicating why the connection was closed.
+ */
void close(final UnbindRequest unbindRequest, final boolean isDisconnectNotification,
final Result reason) {
- boolean notifyClose = false;
- boolean notifyErrorOccurred = false;
-
+ final boolean notifyClose;
+ final boolean notifyErrorOccurred;
+ final List<ConnectionEventListener> tmpListeners;
synchronized (stateLock) {
if (isClosed) {
- // Already closed.
+ // Already closed locally.
return;
- }
-
- if (connectionInvalidReason != null) {
- // Already closed.
- isClosed = true;
- return;
- }
-
- if (unbindRequest != null) {
- // User closed.
- isClosed = true;
+ } else if (unbindRequest != null) {
+ // Local close.
notifyClose = true;
+ notifyErrorOccurred = false;
+ isClosed = true;
+ tmpListeners = listeners;
+ listeners = null; // Prevent future invocations.
+ if (connectionInvalidReason == null) {
+ connectionInvalidReason = reason;
+ }
+ } else if (isFailed) {
+ // Already failed.
+ return;
} else {
+ // Connection has failed and this is the first indication.
+ notifyClose = false;
notifyErrorOccurred = true;
- }
-
- // Mark the connection as invalid.
- if (!isDisconnectNotification) {
- // Connection termination was detected locally, so use the
- // provided
- // reason for all subsequent requests.
+ isFailed = true;
+ failedDueToDisconnect = isDisconnectNotification;
connectionInvalidReason = reason;
- } else {
- // Connection termination was triggered remotely. We don't want
- // to blindly pass on the result code to requests since it could
- // be confused for a genuine response. For example, if the
- // disconnect contained the invalidCredentials result code then
- // this could be misinterpreted as a genuine authentication
- // failure for subsequent bind requests.
- connectionInvalidReason =
- Responses.newResult(ResultCode.CLIENT_SIDE_SERVER_DOWN)
- .setDiagnosticMessage("Connection closed by server");
+ tmpListeners = listeners; // Keep list for client close.
}
}
// First abort all outstanding requests.
- for (int requestID : pendingRequests.keySet()) {
+ for (final int requestID : pendingRequests.keySet()) {
final AbstractLDAPFutureResultImpl<?> future = pendingRequests.remove(requestID);
if (future != null) {
future.adaptErrorResult(connectionInvalidReason);
@@ -741,7 +657,7 @@
// Now try cleanly closing the connection if possible.
// Only send unbind if specified.
- if (unbindRequest != null && !isDisconnectNotification) {
+ if (unbindRequest != null) {
try {
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
@@ -758,18 +674,29 @@
connection.closeSilently();
// Notify listeners.
- if (notifyClose) {
- for (final ConnectionEventListener listener : listeners) {
- listener.handleConnectionClosed();
+ if (tmpListeners != null) {
+ if (notifyErrorOccurred) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ // Use the reason provided in the disconnect notification.
+ listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
+ }
+ }
+ if (notifyClose) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ listener.handleConnectionClosed();
+ }
}
}
+ }
- if (notifyErrorOccurred) {
- for (final ConnectionEventListener listener : listeners) {
- // Use the reason provided in the disconnect notification.
- listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
- }
+ int continuePendingBindRequest(final LDAPBindFutureResultImpl future)
+ throws ErrorResultException {
+ final int newMsgID = nextMsgID.getAndIncrement();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ pendingRequests.put(newMsgID, future);
}
+ return newMsgID;
}
LDAPOptions getLDAPOptions() {
@@ -781,13 +708,14 @@
}
void handleUnsolicitedNotification(final ExtendedResult result) {
- if (isClosed()) {
- // Don't notify after connection is closed.
- return;
+ final List<ConnectionEventListener> tmpListeners;
+ synchronized (stateLock) {
+ tmpListeners = listeners;
}
-
- for (final ConnectionEventListener listener : listeners) {
- listener.handleUnsolicitedNotification(result);
+ if (tmpListeners != null) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ listener.handleUnsolicitedNotification(result);
+ }
}
}
@@ -801,7 +729,7 @@
void installFilter(final Filter filter) {
synchronized (stateLock) {
// Determine the index where the filter should be added.
- FilterChain oldFilterChain = (FilterChain) connection.getProcessor();
+ final FilterChain oldFilterChain = (FilterChain) connection.getProcessor();
int filterIndex = oldFilterChain.size() - 1;
if (filter instanceof SSLFilter) {
// Beneath any ConnectionSecurityLayerFilters if present,
@@ -815,7 +743,7 @@
}
// Create the new filter chain.
- FilterChain newFilterChain =
+ final FilterChain newFilterChain =
FilterChainBuilder.stateless().addAll(oldFilterChain).add(filterIndex, filter)
.build();
connection.setProcessor(newFilterChain);
@@ -831,7 +759,7 @@
boolean isTLSEnabled() {
synchronized (stateLock) {
final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
- for (Filter filter : currentFilterChain) {
+ for (final Filter filter : currentFilterChain) {
if (filter instanceof SSLFilter) {
return true;
}
@@ -856,19 +784,56 @@
throw new IllegalStateException("TLS already enabled");
}
- SSLEngineConfigurator sslEngineConfigurator =
+ final SSLEngineConfigurator sslEngineConfigurator =
new SSLEngineConfigurator(sslContext, true, false, false);
sslEngineConfigurator.setEnabledProtocols(protocols.isEmpty() ? null : protocols
.toArray(new String[protocols.size()]));
sslEngineConfigurator.setEnabledCipherSuites(cipherSuites.isEmpty() ? null
: cipherSuites.toArray(new String[cipherSuites.size()]));
- SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator);
+ final SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator);
installFilter(sslFilter);
sslFilter.handshake(connection, completionHandler);
}
}
+ private ErrorResultException adaptRequestIOException(final IOException e) {
+ // FIXME: what other sort of IOExceptions can be thrown?
+ // FIXME: Is this the best result code?
+ final Result errorResult =
+ Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
+ connectionErrorOccurred(errorResult);
+ return newErrorResult(errorResult);
+ }
+
+ private void checkBindOrStartTLSInProgress() throws ErrorResultException {
+ if (bindOrStartTLSInProgress.get()) {
+ throw newErrorResult(ResultCode.OPERATIONS_ERROR,
+ "Bind or Start TLS operation in progress");
+ }
+ }
+
+ private void checkConnectionIsValid() throws ErrorResultException {
+ if (!isValid0()) {
+ if (failedDueToDisconnect) {
+ // Connection termination was triggered remotely. We don't want
+ // to blindly pass on the result code to requests since it could
+ // be confused for a genuine response. For example, if the
+ // disconnect contained the invalidCredentials result code then
+ // this could be misinterpreted as a genuine authentication
+ // failure for subsequent bind requests.
+ throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN,
+ "Connection closed by server");
+ } else {
+ throw newErrorResult(connectionInvalidReason);
+ }
+ }
+ }
+
private void connectionErrorOccurred(final Result reason) {
close(null, false, reason);
}
+
+ private boolean isValid0() {
+ return !isFailed && !isClosed;
+ }
}
--
Gitblit v1.10.0