From 7983705a0ce4de2fd6e7a4061fe4afa0f9e8c66a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 12 Sep 2012 20:46:39 +0000
Subject: [PATCH] First part of fix for OPENDJ-590: ConnectionPool may return already closed/disconnected connections
---
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java | 350 +++++------
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java | 242 ++++++++
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java | 49
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java | 763 ++++++++++++-------------
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java | 199 ------
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockConnectionEventListener.java | 112 +++
6 files changed, 912 insertions(+), 803 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java
index 97e8229..71c66bd 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java
@@ -226,42 +226,43 @@
LDAP_CONNECTION_ATTR.get(ctx.getConnection());
if (ldapConnection != null) {
if (messageID == 0) {
+ // Unsolicited notification received.
if ((result.getOID() != null)
&& result.getOID().equals(OID_NOTICE_OF_DISCONNECTION)) {
-
+ // Treat this as a connection error.
final Result errorResult =
Responses
.newResult(result.getResultCode())
.setDiagnosticMessage(result.getDiagnosticMessage());
ldapConnection.close(null, true, errorResult);
- return;
} else {
- // Unsolicited notification received.
ldapConnection.handleUnsolicitedNotification(result);
}
- }
+ } else {
+ final AbstractLDAPFutureResultImpl<?> pendingRequest =
+ ldapConnection.removePendingRequest(messageID);
- final AbstractLDAPFutureResultImpl<?> pendingRequest =
- ldapConnection.removePendingRequest(messageID);
-
- if (pendingRequest != null) {
- if (pendingRequest instanceof LDAPExtendedFutureResultImpl<?>) {
- final LDAPExtendedFutureResultImpl<?> extendedFuture =
- ((LDAPExtendedFutureResultImpl<?>) pendingRequest);
- try {
- handleExtendedResult0(ldapConnection, extendedFuture, result);
- } catch (final DecodeException de) {
- // FIXME: should the connection be closed as
- // well?
- final Result errorResult =
- Responses.newResult(
- ResultCode.CLIENT_SIDE_DECODING_ERROR)
- .setDiagnosticMessage(de.getLocalizedMessage())
- .setCause(de);
- extendedFuture.adaptErrorResult(errorResult);
+ if (pendingRequest != null) {
+ if (pendingRequest instanceof LDAPExtendedFutureResultImpl<?>) {
+ final LDAPExtendedFutureResultImpl<?> extendedFuture =
+ ((LDAPExtendedFutureResultImpl<?>) pendingRequest);
+ try {
+ handleExtendedResult0(ldapConnection, extendedFuture,
+ result);
+ } catch (final DecodeException de) {
+ // FIXME: should the connection be closed as
+ // well?
+ final Result errorResult =
+ Responses.newResult(
+ ResultCode.CLIENT_SIDE_DECODING_ERROR)
+ .setDiagnosticMessage(
+ de.getLocalizedMessage()).setCause(
+ de);
+ extendedFuture.adaptErrorResult(errorResult);
+ }
+ } else {
+ throw new UnexpectedResponseException(messageID, result);
}
- } else {
- throw new UnexpectedResponseException(messageID, result);
}
}
}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index 5cbe12c..9d6fb88a 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -31,9 +31,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -83,22 +83,22 @@
/**
* LDAP connection implementation.
- * <p>
- * TODO: handle illegal state exceptions.
*/
final class LDAPConnection extends AbstractAsynchronousConnection implements Connection {
- private final org.glassfish.grizzly.Connection<?> connection;
- private Result connectionInvalidReason;
- private boolean isClosed = false;
- private final List<ConnectionEventListener> listeners =
- new CopyOnWriteArrayList<ConnectionEventListener>();
- private final AtomicInteger nextMsgID = new AtomicInteger(1);
private final AtomicBoolean bindOrStartTLSInProgress = new AtomicBoolean(false);
+ private final org.glassfish.grizzly.Connection<?> connection;
+ private final LDAPWriter ldapWriter = new LDAPWriter();
+ private final AtomicInteger nextMsgID = new AtomicInteger(1);
+ private final LDAPOptions options;
private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests =
new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>();
private final Object stateLock = new Object();
- private final LDAPWriter ldapWriter = new LDAPWriter();
- private final LDAPOptions options;
+ // Guarded by stateLock
+ private Result connectionInvalidReason;
+ private boolean failedDueToDisconnect = false;
+ private boolean isClosed = false;
+ private boolean isFailed = false;
+ private List<ConnectionEventListener> listeners = null;
/**
* Creates a new LDAP connection.
@@ -116,60 +116,47 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Void> abandonAsync(final AbandonRequest request) {
final AbstractLDAPFutureResultImpl<?> pendingRequest;
final int messageID = nextMsgID.getAndIncrement();
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- return new CompletedFutureResult<Void>(newErrorResult(connectionInvalidReason),
- messageID);
- }
- if (bindOrStartTLSInProgress.get()) {
- final Result errorResult =
- Responses.newResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
- "Bind or Start TLS operation in progress");
- return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID);
- }
-
- // First remove the future associated with the request to be
- // abandoned.
- pendingRequest = pendingRequests.remove(request.getRequestID());
- }
-
- if (pendingRequest == null) {
- // There has never been a request with the specified message ID or
- // the response has already been received and handled. We can ignore
- // this abandon request.
-
- // Message ID will be -1 since no request was sent.
- return new CompletedFutureResult<Void>((Void) null);
- }
-
- pendingRequest.cancel(false);
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.abandonRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- return new CompletedFutureResult<Void>((Void) null, messageID);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ // Remove the future associated with the request to be abandoned.
+ pendingRequest = pendingRequests.remove(request.getRequestID());
}
- } catch (final IOException e) {
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID);
+ if (pendingRequest == null) {
+ // There has never been a request with the specified message ID or
+ // the response has already been received and handled. We can ignore
+ // this abandon request.
+
+ // Message ID will be -1 since no request was sent.
+ return new CompletedFutureResult<Void>((Void) null);
+ }
+ pendingRequest.cancel(false);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.abandonRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ return new CompletedFutureResult<Void>((Void) null, messageID);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ return new CompletedFutureResult<Void>(e, messageID);
}
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Result> addAsync(final AddRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
@@ -177,59 +164,67 @@
final LDAPFutureResultImpl future =
new LDAPFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.addRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.addRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public void addConnectionEventListener(final ConnectionEventListener listener) {
Validator.ensureNotNull(listener);
- listeners.add(listener);
+ final boolean notifyClose;
+ final boolean notifyErrorOccurred;
+ synchronized (stateLock) {
+ notifyClose = isClosed;
+ notifyErrorOccurred = isFailed;
+ if (!isClosed) {
+ if (listeners == null) {
+ listeners = new LinkedList<ConnectionEventListener>();
+ }
+ listeners.add(listener);
+ }
+ }
+ if (notifyErrorOccurred) {
+ // Use the reason provided in the disconnect notification.
+ listener.handleConnectionError(failedDueToDisconnect,
+ newErrorResult(connectionInvalidReason));
+ }
+ if (notifyClose) {
+ listener.handleConnectionClosed();
+ }
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<BindResult> bindAsync(final BindRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super BindResult> resultHandler) {
final int messageID = nextMsgID.getAndIncrement();
-
- BindClient context;
+ final BindClient context;
try {
context =
request.createBindClient(
@@ -253,49 +248,41 @@
new LDAPBindFutureResultImpl(messageID, context, resultHandler,
intermediateResponseHandler, this);
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (!pendingRequests.isEmpty()) {
- future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage(
- "There are other operations pending on this connection"));
- return future;
- }
-
- if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
- future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
-
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- // Use the bind client to get the initial request instead of
- // using the bind request passed to this method.
- final GenericBindRequest initialRequest = context.nextBindRequest();
- ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ if (!pendingRequests.isEmpty()) {
+ future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
+ .setDiagnosticMessage(
+ "There are other operations pending on this connection"));
+ return future;
+ }
+ if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
+ future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
+ .setDiagnosticMessage("Bind or Start TLS operation in progress"));
+ return future;
+ }
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
- bindOrStartTLSInProgress.set(false);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ // Use the bind client to get the initial request instead of
+ // using the bind request passed to this method.
+ final GenericBindRequest initialRequest = context.nextBindRequest();
+ ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ bindOrStartTLSInProgress.set(false);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
return future;
@@ -304,6 +291,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void close(final UnbindRequest request, final String reason) {
// FIXME: I18N need to internationalize this message.
Validator.ensureNotNull(request);
@@ -316,6 +304,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<CompareResult> compareAsync(final CompareRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super CompareResult> resultHandler) {
@@ -323,45 +312,34 @@
final LDAPCompareFutureResultImpl future =
new LDAPCompareFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newCompareResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.compareRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.compareRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Result> deleteAsync(final DeleteRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
@@ -369,45 +347,34 @@
final LDAPFutureResultImpl future =
new LDAPFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.deleteRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.deleteRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
final ExtendedRequest<R> request,
final IntermediateResponseHandler intermediateResponseHandler,
@@ -416,68 +383,54 @@
final LDAPExtendedFutureResultImpl<R> future =
new LDAPExtendedFutureResultImpl<R>(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
- if (!pendingRequests.isEmpty()) {
- future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
- ResultCode.OPERATIONS_ERROR, "",
- "There are pending operations on this connection"));
- return future;
- }
- if (isTLSEnabled()) {
- future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
- ResultCode.OPERATIONS_ERROR, "",
- "This connection is already TLS enabled"));
- return future;
- }
- if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
- future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
- ResultCode.OPERATIONS_ERROR, "",
- "Bind or Start TLS operation in progress"));
- return future;
- }
- } else {
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
- ResultCode.OPERATIONS_ERROR, "",
- "Bind or Start TLS operation in progress"));
- return future;
- }
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.extendedRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
+ if (!pendingRequests.isEmpty()) {
+ future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
+ ResultCode.OPERATIONS_ERROR, "",
+ "There are pending operations on this connection"));
+ return future;
+ } else if (isTLSEnabled()) {
+ future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
+ ResultCode.OPERATIONS_ERROR, "",
+ "This connection is already TLS enabled"));
+ return future;
+ } else if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
+ future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
+ ResultCode.OPERATIONS_ERROR, "",
+ "Bind or Start TLS operation in progress"));
+ return future;
+ }
+ } else {
+ checkBindOrStartTLSInProgress();
+ }
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
- bindOrStartTLSInProgress.set(false);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.extendedRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ bindOrStartTLSInProgress.set(false);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public boolean isClosed() {
synchronized (stateLock) {
return isClosed;
@@ -487,15 +440,17 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isValid() {
synchronized (stateLock) {
- return connectionInvalidReason == null && !isClosed;
+ return isValid0();
}
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Result> modifyAsync(final ModifyRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
@@ -503,45 +458,34 @@
final LDAPFutureResultImpl future =
new LDAPFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.modifyRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.modifyRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super Result> resultHandler) {
@@ -549,53 +493,47 @@
final LDAPFutureResultImpl future =
new LDAPFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.modifyDNRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.modifyDNRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public void removeConnectionEventListener(final ConnectionEventListener listener) {
Validator.ensureNotNull(listener);
- listeners.remove(listener);
+ synchronized (stateLock) {
+ if (listeners != null) {
+ listeners.remove(listener);
+ }
+ }
}
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Result> searchAsync(final SearchRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final SearchResultHandler resultHandler) {
@@ -603,47 +541,36 @@
final LDAPSearchFutureResultImpl future =
new LDAPSearchFutureResultImpl(messageID, request, resultHandler,
intermediateResponseHandler, this);
-
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- future.adaptErrorResult(connectionInvalidReason);
- return future;
- }
- if (bindOrStartTLSInProgress.get()) {
- future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
- .setDiagnosticMessage("Bind or Start TLS operation in progress"));
- return future;
- }
- pendingRequests.put(messageID, future);
- }
-
try {
- final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
- try {
- ldapWriter.searchRequest(asn1Writer, messageID, request);
- connection.write(asn1Writer.getBuffer(), null);
- } finally {
- asn1Writer.recycle();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ checkBindOrStartTLSInProgress();
+ pendingRequests.put(messageID, future);
}
- } catch (final IOException e) {
- pendingRequests.remove(messageID);
-
- // FIXME: what other sort of IOExceptions can be thrown?
- // FIXME: Is this the best result code?
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
- connectionErrorOccurred(errorResult);
- future.adaptErrorResult(errorResult);
+ try {
+ final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+ try {
+ ldapWriter.searchRequest(asn1Writer, messageID, request);
+ connection.write(asn1Writer.getBuffer(), null);
+ } finally {
+ asn1Writer.recycle();
+ }
+ } catch (final IOException e) {
+ pendingRequests.remove(messageID);
+ throw adaptRequestIOException(e);
+ }
+ } catch (final ErrorResultException e) {
+ future.adaptErrorResult(e.getResult());
}
-
return future;
}
/**
* {@inheritDoc}
*/
+ @Override
public String toString() {
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("LDAPConnection(");
builder.append(connection.getLocalAddress());
builder.append(',');
@@ -652,23 +579,11 @@
return builder.toString();
}
- int continuePendingBindRequest(final LDAPBindFutureResultImpl future)
- throws ErrorResultException {
- final int newMsgID = nextMsgID.getAndIncrement();
- synchronized (stateLock) {
- if (connectionInvalidReason != null) {
- throw newErrorResult(connectionInvalidReason);
- }
- pendingRequests.put(newMsgID, future);
- }
- return newMsgID;
- }
-
long cancelExpiredRequests(final long currentTime) {
final long timeout = options.getTimeout(TimeUnit.MILLISECONDS);
long delay = timeout;
if (timeout > 0) {
- for (int requestID : pendingRequests.keySet()) {
+ for (final int requestID : pendingRequests.keySet()) {
final AbstractLDAPFutureResultImpl<?> future = pendingRequests.get(requestID);
if (future != null) {
final long diff = (future.getTimestamp() + timeout) - currentTime;
@@ -687,52 +602,53 @@
return delay;
}
+ /**
+ * Closes this connection, invoking event listeners as needed.
+ *
+ * @param unbindRequest
+ * The client provided unbind request if this is a client
+ * initiated close, or {@code null} if the connection has failed.
+ * @param isDisconnectNotification
+ * {@code true} if this is a connection failure signalled by a
+ * server disconnect notification.
+ * @param reason
+ * The result indicating why the connection was closed.
+ */
void close(final UnbindRequest unbindRequest, final boolean isDisconnectNotification,
final Result reason) {
- boolean notifyClose = false;
- boolean notifyErrorOccurred = false;
-
+ final boolean notifyClose;
+ final boolean notifyErrorOccurred;
+ final List<ConnectionEventListener> tmpListeners;
synchronized (stateLock) {
if (isClosed) {
- // Already closed.
+ // Already closed locally.
return;
- }
-
- if (connectionInvalidReason != null) {
- // Already closed.
- isClosed = true;
- return;
- }
-
- if (unbindRequest != null) {
- // User closed.
- isClosed = true;
+ } else if (unbindRequest != null) {
+ // Local close.
notifyClose = true;
+ notifyErrorOccurred = false;
+ isClosed = true;
+ tmpListeners = listeners;
+ listeners = null; // Prevent future invocations.
+ if (connectionInvalidReason == null) {
+ connectionInvalidReason = reason;
+ }
+ } else if (isFailed) {
+ // Already failed.
+ return;
} else {
+ // Connection has failed and this is the first indication.
+ notifyClose = false;
notifyErrorOccurred = true;
- }
-
- // Mark the connection as invalid.
- if (!isDisconnectNotification) {
- // Connection termination was detected locally, so use the
- // provided
- // reason for all subsequent requests.
+ isFailed = true;
+ failedDueToDisconnect = isDisconnectNotification;
connectionInvalidReason = reason;
- } else {
- // Connection termination was triggered remotely. We don't want
- // to blindly pass on the result code to requests since it could
- // be confused for a genuine response. For example, if the
- // disconnect contained the invalidCredentials result code then
- // this could be misinterpreted as a genuine authentication
- // failure for subsequent bind requests.
- connectionInvalidReason =
- Responses.newResult(ResultCode.CLIENT_SIDE_SERVER_DOWN)
- .setDiagnosticMessage("Connection closed by server");
+ tmpListeners = listeners; // Keep list for client close.
}
}
// First abort all outstanding requests.
- for (int requestID : pendingRequests.keySet()) {
+ for (final int requestID : pendingRequests.keySet()) {
final AbstractLDAPFutureResultImpl<?> future = pendingRequests.remove(requestID);
if (future != null) {
future.adaptErrorResult(connectionInvalidReason);
@@ -741,7 +657,7 @@
// Now try cleanly closing the connection if possible.
// Only send unbind if specified.
- if (unbindRequest != null && !isDisconnectNotification) {
+ if (unbindRequest != null) {
try {
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
@@ -758,18 +674,29 @@
connection.closeSilently();
// Notify listeners.
- if (notifyClose) {
- for (final ConnectionEventListener listener : listeners) {
- listener.handleConnectionClosed();
+ if (tmpListeners != null) {
+ if (notifyErrorOccurred) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ // Use the reason provided in the disconnect notification.
+ listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
+ }
+ }
+ if (notifyClose) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ listener.handleConnectionClosed();
+ }
}
}
+ }
- if (notifyErrorOccurred) {
- for (final ConnectionEventListener listener : listeners) {
- // Use the reason provided in the disconnect notification.
- listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
- }
+ int continuePendingBindRequest(final LDAPBindFutureResultImpl future)
+ throws ErrorResultException {
+ final int newMsgID = nextMsgID.getAndIncrement();
+ synchronized (stateLock) {
+ checkConnectionIsValid();
+ pendingRequests.put(newMsgID, future);
}
+ return newMsgID;
}
LDAPOptions getLDAPOptions() {
@@ -781,13 +708,14 @@
}
void handleUnsolicitedNotification(final ExtendedResult result) {
- if (isClosed()) {
- // Don't notify after connection is closed.
- return;
+ final List<ConnectionEventListener> tmpListeners;
+ synchronized (stateLock) {
+ tmpListeners = listeners;
}
-
- for (final ConnectionEventListener listener : listeners) {
- listener.handleUnsolicitedNotification(result);
+ if (tmpListeners != null) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ listener.handleUnsolicitedNotification(result);
+ }
}
}
@@ -801,7 +729,7 @@
void installFilter(final Filter filter) {
synchronized (stateLock) {
// Determine the index where the filter should be added.
- FilterChain oldFilterChain = (FilterChain) connection.getProcessor();
+ final FilterChain oldFilterChain = (FilterChain) connection.getProcessor();
int filterIndex = oldFilterChain.size() - 1;
if (filter instanceof SSLFilter) {
// Beneath any ConnectionSecurityLayerFilters if present,
@@ -815,7 +743,7 @@
}
// Create the new filter chain.
- FilterChain newFilterChain =
+ final FilterChain newFilterChain =
FilterChainBuilder.stateless().addAll(oldFilterChain).add(filterIndex, filter)
.build();
connection.setProcessor(newFilterChain);
@@ -831,7 +759,7 @@
boolean isTLSEnabled() {
synchronized (stateLock) {
final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
- for (Filter filter : currentFilterChain) {
+ for (final Filter filter : currentFilterChain) {
if (filter instanceof SSLFilter) {
return true;
}
@@ -856,19 +784,56 @@
throw new IllegalStateException("TLS already enabled");
}
- SSLEngineConfigurator sslEngineConfigurator =
+ final SSLEngineConfigurator sslEngineConfigurator =
new SSLEngineConfigurator(sslContext, true, false, false);
sslEngineConfigurator.setEnabledProtocols(protocols.isEmpty() ? null : protocols
.toArray(new String[protocols.size()]));
sslEngineConfigurator.setEnabledCipherSuites(cipherSuites.isEmpty() ? null
: cipherSuites.toArray(new String[cipherSuites.size()]));
- SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator);
+ final SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator);
installFilter(sslFilter);
sslFilter.handshake(connection, completionHandler);
}
}
+ private ErrorResultException adaptRequestIOException(final IOException e) {
+ // FIXME: what other sort of IOExceptions can be thrown?
+ // FIXME: Is this the best result code?
+ final Result errorResult =
+ Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
+ connectionErrorOccurred(errorResult);
+ return newErrorResult(errorResult);
+ }
+
+ private void checkBindOrStartTLSInProgress() throws ErrorResultException {
+ if (bindOrStartTLSInProgress.get()) {
+ throw newErrorResult(ResultCode.OPERATIONS_ERROR,
+ "Bind or Start TLS operation in progress");
+ }
+ }
+
+ private void checkConnectionIsValid() throws ErrorResultException {
+ if (!isValid0()) {
+ if (failedDueToDisconnect) {
+ // Connection termination was triggered remotely. We don't want
+ // to blindly pass on the result code to requests since it could
+ // be confused for a genuine response. For example, if the
+ // disconnect contained the invalidCredentials result code then
+ // this could be misinterpreted as a genuine authentication
+ // failure for subsequent bind requests.
+ throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN,
+ "Connection closed by server");
+ } else {
+ throw newErrorResult(connectionInvalidReason);
+ }
+ }
+ }
+
private void connectionErrorOccurred(final Result reason) {
close(null, false, reason);
}
+
+ private boolean isValid0() {
+ return !isFailed && !isClosed;
+ }
}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
index a6ec1d7..4cc3bab 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
@@ -33,6 +33,7 @@
import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -89,12 +90,12 @@
final class LDAPServerFilter extends BaseFilter {
private abstract class AbstractHandler<R extends Result> implements
IntermediateResponseHandler, ResultHandler<R> {
+ protected final ClientContextImpl context;
protected final int messageID;
- protected final Connection<?> connection;
- protected AbstractHandler(final int messageID, final Connection<?> connection) {
+ protected AbstractHandler(final ClientContextImpl context, final int messageID) {
this.messageID = messageID;
- this.connection = connection;
+ this.context = context;
}
@Override
@@ -102,9 +103,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.intermediateResponse(asn1Writer, messageID, response);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
return false;
} finally {
asn1Writer.recycle();
@@ -114,8 +115,8 @@
}
private final class AddHandler extends AbstractHandler<Result> {
- private AddHandler(final int messageID, final Connection<?> connection) {
- super(messageID, connection);
+ private AddHandler(final ClientContextImpl context, final int messageID) {
+ super(context, messageID);
}
@Override
@@ -128,9 +129,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.addResult(asn1Writer, messageID, result);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
} finally {
asn1Writer.recycle();
}
@@ -138,8 +139,8 @@
}
private final class BindHandler extends AbstractHandler<BindResult> {
- private BindHandler(final int messageID, final Connection<?> connection) {
- super(messageID, connection);
+ private BindHandler(final ClientContextImpl context, final int messageID) {
+ super(context, messageID);
}
@Override
@@ -164,9 +165,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.bindResult(asn1Writer, messageID, result);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
} finally {
asn1Writer.recycle();
}
@@ -175,9 +176,7 @@
private final class ClientContextImpl implements LDAPClientContext {
private final Connection<?> connection;
-
- private volatile boolean isClosed = false;
-
+ private final AtomicBoolean isClosed = new AtomicBoolean();
private ServerConnection<Integer> serverConnection = null;
private ClientContextImpl(final Connection<?> connection) {
@@ -186,18 +185,44 @@
@Override
public void disconnect() {
- LDAPServerFilter.notifyConnectionDisconnected(connection, null, null);
+ disconnect0(null, null);
}
@Override
public void disconnect(final ResultCode resultCode, final String message) {
Validator.ensureNotNull(resultCode);
-
final GenericExtendedResult notification =
Responses.newGenericExtendedResult(resultCode).setOID(
OID_NOTICE_OF_DISCONNECTION).setDiagnosticMessage(message);
sendUnsolicitedNotification(notification);
- LDAPServerFilter.notifyConnectionDisconnected(connection, resultCode, message);
+ disconnect0(resultCode, message);
+ }
+
+ @Override
+ public void enableConnectionSecurityLayer(final ConnectionSecurityLayer layer) {
+ synchronized (this) {
+ installFilter(new ConnectionSecurityLayerFilter(layer, connection.getTransport()
+ .getMemoryManager()));
+ }
+ }
+
+ @Override
+ public void enableTLS(final SSLContext sslContext, final String[] protocols,
+ final String[] suites, final boolean wantClientAuth, final boolean needClientAuth) {
+ Validator.ensureNotNull(sslContext);
+ synchronized (this) {
+ if (isTLSEnabled()) {
+ throw new IllegalStateException("TLS already enabled");
+ }
+
+ final SSLEngineConfigurator sslEngineConfigurator =
+ new SSLEngineConfigurator(sslContext, false, false, false);
+ sslEngineConfigurator.setEnabledCipherSuites(suites);
+ sslEngineConfigurator.setEnabledProtocols(protocols);
+ sslEngineConfigurator.setWantClientAuth(wantClientAuth);
+ sslEngineConfigurator.setNeedClientAuth(needClientAuth);
+ installFilter(new SSLFilter(sslEngineConfigurator, null));
+ }
}
@Override
@@ -232,7 +257,7 @@
*/
@Override
public boolean isClosed() {
- return isClosed;
+ return isClosed.get();
}
@Override
@@ -242,44 +267,18 @@
LDAP_WRITER.extendedResult(asn1Writer, 0, notification);
connection.write(asn1Writer.getBuffer(), null);
} catch (final IOException ioe) {
- LDAPServerFilter.notifyConnectionException(connection, ioe);
+ handleError(ioe);
} finally {
asn1Writer.recycle();
}
}
- @Override
- public void enableConnectionSecurityLayer(final ConnectionSecurityLayer layer) {
- synchronized (this) {
- installFilter(new ConnectionSecurityLayerFilter(layer, connection.getTransport()
- .getMemoryManager()));
- }
- }
-
- @Override
- public void enableTLS(final SSLContext sslContext, final String[] protocols,
- final String[] suites, final boolean wantClientAuth, final boolean needClientAuth) {
- Validator.ensureNotNull(sslContext);
- synchronized (this) {
- if (isTLSEnabled()) {
- throw new IllegalStateException("TLS already enabled");
- }
-
- SSLEngineConfigurator sslEngineConfigurator =
- new SSLEngineConfigurator(sslContext, false, false, false);
- sslEngineConfigurator.setEnabledCipherSuites(suites);
- sslEngineConfigurator.setEnabledProtocols(protocols);
- sslEngineConfigurator.setWantClientAuth(wantClientAuth);
- sslEngineConfigurator.setNeedClientAuth(needClientAuth);
- installFilter(new SSLFilter(sslEngineConfigurator, null));
- }
- }
-
/**
* {@inheritDoc}
*/
+ @Override
public String toString() {
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("LDAPClientContext(");
builder.append(getLocalAddress());
builder.append(',');
@@ -288,16 +287,67 @@
return builder.toString();
}
- private void close() {
- isClosed = true;
+ public void write(final ASN1BufferWriter asn1Writer) {
+ connection.write(asn1Writer.getBuffer(), null);
+ }
+
+ private void disconnect0(final ResultCode resultCode, final String message) {
+ // Close this connection context.
+ if (isClosed.compareAndSet(false, true)) {
+ try {
+ // Notify the server connection: it may be null if disconnect is
+ // invoked during accept.
+ if (serverConnection != null) {
+ serverConnection.handleConnectionDisconnected(resultCode, message);
+ }
+ } finally {
+ // Close the connection.
+ connection.closeSilently();
+ }
+ }
}
private ServerConnection<Integer> getServerConnection() {
return serverConnection;
}
- private void setServerConnection(final ServerConnection<Integer> serverConnection) {
- this.serverConnection = serverConnection;
+ private void handleClose(final int messageID, final UnbindRequest unbindRequest) {
+ // Close this connection context.
+ if (isClosed.compareAndSet(false, true)) {
+ try {
+ // Notify the server connection: it may be null if disconnect is
+ // invoked during accept.
+ if (serverConnection != null) {
+ serverConnection.handleConnectionClosed(messageID, unbindRequest);
+ }
+ } finally {
+ // If this close was a result of an unbind request then the
+ // connection won't actually be closed yet. To avoid TIME_WAIT TCP
+ // state, let the client disconnect.
+ if (unbindRequest != null) {
+ return;
+ }
+
+ // Close the connection.
+ connection.closeSilently();
+ }
+ }
+ }
+
+ private void handleError(final Throwable error) {
+ // Close this connection context.
+ if (isClosed.compareAndSet(false, true)) {
+ try {
+ // Notify the server connection: it may be null if disconnect is
+ // invoked during accept.
+ if (serverConnection != null) {
+ serverConnection.handleConnectionError(error);
+ }
+ } finally {
+ // Close the connection.
+ connection.closeSilently();
+ }
+ }
}
/**
@@ -313,8 +363,7 @@
int filterIndex = oldFilterChain.size() - 1;
if (filter instanceof SSLFilter) {
// Beneath any ConnectionSecurityLayerFilters if present,
- // otherwise
- // beneath the LDAP filter.
+ // otherwise beneath the LDAP filter.
for (int i = oldFilterChain.size() - 2; i >= 0; i--) {
if (!(oldFilterChain.get(i) instanceof ConnectionSecurityLayerFilter)) {
filterIndex = i + 1;
@@ -339,7 +388,7 @@
private boolean isTLSEnabled() {
synchronized (this) {
final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
- for (Filter filter : currentFilterChain) {
+ for (final Filter filter : currentFilterChain) {
if (filter instanceof SSLFilter) {
return true;
}
@@ -348,11 +397,14 @@
}
}
+ private void setServerConnection(final ServerConnection<Integer> serverConnection) {
+ this.serverConnection = serverConnection;
+ }
}
private final class CompareHandler extends AbstractHandler<CompareResult> {
- private CompareHandler(final int messageID, final Connection<?> connection) {
- super(messageID, connection);
+ private CompareHandler(final ClientContextImpl context, final int messageID) {
+ super(context, messageID);
}
@Override
@@ -377,9 +429,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.compareResult(asn1Writer, messageID, result);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
} finally {
asn1Writer.recycle();
}
@@ -387,8 +439,8 @@
}
private final class DeleteHandler extends AbstractHandler<Result> {
- private DeleteHandler(final int messageID, final Connection<?> connection) {
- super(messageID, connection);
+ private DeleteHandler(final ClientContextImpl context, final int messageID) {
+ super(context, messageID);
}
@Override
@@ -401,9 +453,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.deleteResult(asn1Writer, messageID, result);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
} finally {
asn1Writer.recycle();
}
@@ -411,8 +463,8 @@
}
private final class ExtendedHandler<R extends ExtendedResult> extends AbstractHandler<R> {
- private ExtendedHandler(final int messageID, final Connection<?> connection) {
- super(messageID, connection);
+ private ExtendedHandler(final ClientContextImpl context, final int messageID) {
+ super(context, messageID);
}
@Override
@@ -438,9 +490,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.extendedResult(asn1Writer, messageID, result);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
} finally {
asn1Writer.recycle();
}
@@ -448,8 +500,8 @@
}
private final class ModifyDNHandler extends AbstractHandler<Result> {
- private ModifyDNHandler(final int messageID, final Connection<?> connection) {
- super(messageID, connection);
+ private ModifyDNHandler(final ClientContextImpl context, final int messageID) {
+ super(context, messageID);
}
@Override
@@ -462,9 +514,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.modifyDNResult(asn1Writer, messageID, result);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
} finally {
asn1Writer.recycle();
}
@@ -472,8 +524,8 @@
}
private final class ModifyHandler extends AbstractHandler<Result> {
- private ModifyHandler(final int messageID, final Connection<?> connection) {
- super(messageID, connection);
+ private ModifyHandler(final ClientContextImpl context, final int messageID) {
+ super(context, messageID);
}
@Override
@@ -486,9 +538,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.modifyResult(asn1Writer, messageID, result);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
} finally {
asn1Writer.recycle();
}
@@ -497,8 +549,8 @@
private final class SearchHandler extends AbstractHandler<Result> implements
SearchResultHandler {
- private SearchHandler(final int messageID, final Connection<?> connection) {
- super(messageID, connection);
+ private SearchHandler(final ClientContextImpl context, final int messageID) {
+ super(context, messageID);
}
@Override
@@ -506,9 +558,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.searchResultEntry(asn1Writer, messageID, entry);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
return false;
} finally {
asn1Writer.recycle();
@@ -526,9 +578,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.searchResultReference(asn1Writer, messageID, reference);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
return false;
} finally {
asn1Writer.recycle();
@@ -541,9 +593,9 @@
final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
try {
LDAP_WRITER.searchResult(asn1Writer, messageID, result);
- connection.write(asn1Writer.getBuffer(), null);
+ context.write(asn1Writer);
} catch (final IOException ioe) {
- notifyConnectionException(connection, ioe);
+ context.handleError(ioe);
} finally {
asn1Writer.recycle();
}
@@ -554,6 +606,14 @@
// following RFCs: 5289, 4346, 3268,4132 and 4162.
private static final Map<String, Integer> CIPHER_KEY_SIZES;
+ private static final Attribute<ASN1BufferReader> LDAP_ASN1_READER_ATTR =
+ Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPASN1Reader");
+
+ private static final Attribute<ClientContextImpl> LDAP_CONNECTION_ATTR =
+ Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPServerConnection");
+
+ private static final LDAPWriter LDAP_WRITER = new LDAPWriter();
+
static {
CIPHER_KEY_SIZES = new LinkedHashMap<String, Integer>();
CIPHER_KEY_SIZES.put("_WITH_AES_256_CBC_", 256);
@@ -572,78 +632,10 @@
CIPHER_KEY_SIZES.put("_WITH_NULL_", 0);
}
- private static final LDAPWriter LDAP_WRITER = new LDAPWriter();
+ private final LDAPReader ldapReader;
- private static final Attribute<ClientContextImpl> LDAP_CONNECTION_ATTR =
- Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPServerConnection");
-
- private static final Attribute<ASN1BufferReader> LDAP_ASN1_READER_ATTR =
- Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPASN1Reader");
-
- private static void notifyConnectionClosed(final Connection<?> connection, final int messageID,
- final UnbindRequest unbindRequest) {
- final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(connection);
- if (clientContext != null) {
- // Close the connection context.
- clientContext.close();
-
- // Notify the server connection: it may be null if disconnect is
- // invoked during accept.
- final ServerConnection<Integer> serverConnection = clientContext.getServerConnection();
- if (serverConnection != null) {
- serverConnection.handleConnectionClosed(messageID, unbindRequest);
- }
-
- // If this close was a result of an unbind request then the
- // connection won't actually be closed yet. To avoid TIME_WAIT TCP
- // state, let the client disconnect.
- if (unbindRequest != null) {
- return;
- }
-
- // Close the connection.
- connection.closeSilently();
- }
- }
-
- private static void notifyConnectionDisconnected(final Connection<?> connection,
- final ResultCode resultCode, final String message) {
- final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(connection);
- if (clientContext != null) {
- // Close the connection context.
- clientContext.close();
-
- // Notify the server connection: it may be null if disconnect is
- // invoked during accept.
- final ServerConnection<Integer> serverConnection = clientContext.getServerConnection();
- if (serverConnection != null) {
- serverConnection.handleConnectionDisconnected(resultCode, message);
- }
-
- // Close the connection.
- connection.closeSilently();
- }
- }
-
- private static void notifyConnectionException(final Connection<?> connection,
- final Throwable error) {
- final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(connection);
- if (clientContext != null) {
- // Close the connection context.
- clientContext.close();
-
- // Notify the server connection: it may be null if disconnect is
- // invoked during accept.
- final ServerConnection<Integer> serverConnection = clientContext.getServerConnection();
- if (serverConnection != null) {
- serverConnection.handleConnectionError(error);
- }
-
- // Close the connection.
- connection.closeSilently();
- }
- }
-
+ private final LDAPListenerImpl listener;
+ private final int maxASN1ElementSize;
private final AbstractLDAPMessageHandler<FilterChainContext> serverRequestHandler =
new AbstractLDAPMessageHandler<FilterChainContext>() {
@Override
@@ -664,7 +656,7 @@
LDAP_CONNECTION_ATTR.get(ctx.getConnection());
if (clientContext != null) {
final ServerConnection<Integer> conn = clientContext.getServerConnection();
- final AddHandler handler = new AddHandler(messageID, ctx.getConnection());
+ final AddHandler handler = new AddHandler(clientContext, messageID);
conn.handleAdd(messageID, request, handler, handler);
}
}
@@ -677,7 +669,7 @@
LDAP_CONNECTION_ATTR.get(ctx.getConnection());
if (clientContext != null) {
final ServerConnection<Integer> conn = clientContext.getServerConnection();
- final BindHandler handler = new BindHandler(messageID, ctx.getConnection());
+ final BindHandler handler = new BindHandler(clientContext, messageID);
conn.handleBind(messageID, version, bindContext, handler, handler);
}
}
@@ -689,8 +681,7 @@
LDAP_CONNECTION_ATTR.get(ctx.getConnection());
if (clientContext != null) {
final ServerConnection<Integer> conn = clientContext.getServerConnection();
- final CompareHandler handler =
- new CompareHandler(messageID, ctx.getConnection());
+ final CompareHandler handler = new CompareHandler(clientContext, messageID);
conn.handleCompare(messageID, request, handler, handler);
}
}
@@ -702,8 +693,7 @@
LDAP_CONNECTION_ATTR.get(ctx.getConnection());
if (clientContext != null) {
final ServerConnection<Integer> conn = clientContext.getServerConnection();
- final DeleteHandler handler =
- new DeleteHandler(messageID, ctx.getConnection());
+ final DeleteHandler handler = new DeleteHandler(clientContext, messageID);
conn.handleDelete(messageID, request, handler, handler);
}
}
@@ -717,7 +707,7 @@
if (clientContext != null) {
final ServerConnection<Integer> conn = clientContext.getServerConnection();
final ExtendedHandler<R> handler =
- new ExtendedHandler<R>(messageID, ctx.getConnection());
+ new ExtendedHandler<R>(clientContext, messageID);
conn.handleExtendedRequest(messageID, request, handler, handler);
}
}
@@ -730,7 +720,7 @@
if (clientContext != null) {
final ServerConnection<Integer> conn = clientContext.getServerConnection();
final ModifyDNHandler handler =
- new ModifyDNHandler(messageID, ctx.getConnection());
+ new ModifyDNHandler(clientContext, messageID);
conn.handleModifyDN(messageID, request, handler, handler);
}
}
@@ -742,8 +732,7 @@
LDAP_CONNECTION_ATTR.get(ctx.getConnection());
if (clientContext != null) {
final ServerConnection<Integer> conn = clientContext.getServerConnection();
- final ModifyHandler handler =
- new ModifyHandler(messageID, ctx.getConnection());
+ final ModifyHandler handler = new ModifyHandler(clientContext, messageID);
conn.handleModify(messageID, request, handler, handler);
}
}
@@ -755,8 +744,7 @@
LDAP_CONNECTION_ATTR.get(ctx.getConnection());
if (clientContext != null) {
final ServerConnection<Integer> conn = clientContext.getServerConnection();
- final SearchHandler handler =
- new SearchHandler(messageID, ctx.getConnection());
+ final SearchHandler handler = new SearchHandler(clientContext, messageID);
conn.handleSearch(messageID, request, handler, handler);
}
}
@@ -764,23 +752,23 @@
@Override
public void unbindRequest(final FilterChainContext ctx, final int messageID,
final UnbindRequest request) {
- notifyConnectionClosed(ctx.getConnection(), messageID, request);
+ // Remove the client context causing any subsequent LDAP
+ // traffic to be ignored.
+ final ClientContextImpl clientContext =
+ LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
+ if (clientContext != null) {
+ clientContext.handleClose(messageID, request);
+ }
}
@Override
public void unrecognizedMessage(final FilterChainContext ctx, final int messageID,
final byte messageTag, final ByteString messageBytes) {
- notifyConnectionException(ctx.getConnection(), new UnsupportedMessageException(
- messageID, messageTag, messageBytes));
+ exceptionOccurred(ctx, new UnsupportedMessageException(messageID, messageTag,
+ messageBytes));
}
};
- private final int maxASN1ElementSize;
-
- private final LDAPReader ldapReader;
-
- private final LDAPListenerImpl listener;
-
LDAPServerFilter(final LDAPListenerImpl listener, final LDAPReader ldapReader,
final int maxASN1ElementSize) {
this.listener = listener;
@@ -790,7 +778,10 @@
@Override
public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
- notifyConnectionException(ctx.getConnection(), error);
+ final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
+ if (clientContext != null) {
+ clientContext.handleError(error);
+ }
}
@Override
@@ -812,7 +803,10 @@
@Override
public NextAction handleClose(final FilterChainContext ctx) throws IOException {
- notifyConnectionClosed(ctx.getConnection(), -1, null);
+ final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
+ if (clientContext != null) {
+ clientContext.handleClose(-1, null);
+ }
return ctx.getStopAction();
}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
index 9b6f243..19887b1 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -22,13 +22,15 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011 ForgeRock AS
+ * Portions copyright 2011-2012 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
import static org.fest.assertions.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,16 +39,22 @@
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import javax.net.ssl.SSLContext;
+import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest;
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.schema.Schema;
import org.forgerock.opendj.ldap.schema.SchemaBuilder;
@@ -65,6 +73,10 @@
*/
@SuppressWarnings("javadoc")
public class ConnectionFactoryTestCase extends SdkTestCase {
+ // Test timeout in ms for tests which need to wait for network events.
+ private static final long TEST_TIMEOUT = 30L;
+ private static final long TEST_TIMEOUT_MS = TEST_TIMEOUT * 1000L;
+
class MyResultHandler implements ResultHandler<Connection> {
// latch.
private final CountDownLatch latch;
@@ -114,8 +126,8 @@
StaticUtils.DEBUG_LOG.setLevel(Level.INFO);
}
- @DataProvider(name = "connectionFactories")
- public Object[][] getConnectionFactories() throws Exception {
+ @DataProvider
+ Object[][] connectionFactories() throws Exception {
Object[][] factories = new Object[21][1];
// HeartBeatConnectionFactory
@@ -429,4 +441,228 @@
assertThat(realConnectionIsClosed[2]).isTrue();
assertThat(realConnectionIsClosed[3]).isTrue();
}
+
+ private static final class CloseNotify {
+ private boolean closeOnAccept;
+ private boolean doBindFirst;
+ private boolean useEventListener;
+ private boolean sendDisconnectNotification;
+
+ private CloseNotify(boolean closeOnAccept, boolean doBindFirst, boolean useEventListener,
+ boolean sendDisconnectNotification) {
+ this.closeOnAccept = closeOnAccept;
+ this.doBindFirst = doBindFirst;
+ this.useEventListener = useEventListener;
+ this.sendDisconnectNotification = sendDisconnectNotification;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ if (closeOnAccept) {
+ builder.append(" closeOnAccept");
+ }
+ if (doBindFirst) {
+ builder.append(" doBindFirst");
+ }
+ if (useEventListener) {
+ builder.append(" useEventListener");
+ }
+ if (sendDisconnectNotification) {
+ builder.append(" sendDisconnectNotification");
+ }
+ builder.append(" ]");
+ return builder.toString();
+ }
+ }
+
+ @DataProvider
+ Object[][] closeNotifyConfig() {
+ // @formatter:off
+ return new Object[][] {
+ // closeOnAccept, doBindFirst, useEventListener, sendDisconnectNotification
+
+ // Close on accept.
+ { new CloseNotify(true, false, false, false) },
+ { new CloseNotify(true, false, true, false) },
+
+ // Use disconnect.
+ { new CloseNotify(false, false, false, false) },
+ { new CloseNotify(false, false, false, true) },
+ { new CloseNotify(false, false, true, false) },
+ { new CloseNotify(false, false, true, true) },
+ { new CloseNotify(false, true, false, false) },
+ { new CloseNotify(false, true, false, true) },
+ { new CloseNotify(false, true, true, false) },
+ { new CloseNotify(false, true, true, true) },
+ };
+ // @formatter:on
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(dataProvider = "closeNotifyConfig")
+ public void testCloseNotify(final CloseNotify config) throws Exception {
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+ final AtomicReference<LDAPClientContext> contextHolder =
+ new AtomicReference<LDAPClientContext>();
+
+ final ServerConnectionFactory<LDAPClientContext, Integer> mockServer =
+ mock(ServerConnectionFactory.class);
+ when(mockServer.handleAccept(any(LDAPClientContext.class))).thenAnswer(
+ new Answer<ServerConnection<Integer>>() {
+
+ public ServerConnection<Integer> answer(InvocationOnMock invocation)
+ throws Throwable {
+ // Allow the context to be accessed from outside the mock.
+ contextHolder.set((LDAPClientContext) invocation.getArguments()[0]);
+ connectLatch.countDown(); /* is this needed? */
+ if (config.closeOnAccept) {
+ throw newErrorResult(ResultCode.UNAVAILABLE);
+ } else {
+ // Return a mock connection which always succeeds for binds.
+ ServerConnection<Integer> mockConnection = mock(ServerConnection.class);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ ResultHandler<? super BindResult> resultHandler =
+ (ResultHandler<? super BindResult>) invocation
+ .getArguments()[4];
+ resultHandler.handleResult(Responses
+ .newBindResult(ResultCode.SUCCESS));
+ return null;
+ }
+ }).when(mockConnection).handleBind(anyInt(), anyInt(),
+ any(BindRequest.class), any(IntermediateResponseHandler.class),
+ any(ResultHandler.class));
+ return mockConnection;
+ }
+ }
+ });
+
+ final int port = TestCaseUtils.findFreePort();
+ LDAPListener listener = new LDAPListener(port, mockServer);
+ try {
+ LDAPConnectionFactory clientFactory = new LDAPConnectionFactory("localhost", port);
+ final Connection client = clientFactory.getConnection();
+ connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
+ MockConnectionEventListener mockListener = null;
+ try {
+ if (config.useEventListener) {
+ mockListener = new MockConnectionEventListener();
+ client.addConnectionEventListener(mockListener);
+ }
+ if (config.doBindFirst) {
+ client.bind("cn=test", "password".toCharArray());
+ }
+ if (!config.closeOnAccept) {
+ // Disconnect using client context.
+ LDAPClientContext context = contextHolder.get();
+ assertThat(context).isNotNull();
+ assertThat(context.isClosed()).isFalse();
+ if (config.sendDisconnectNotification) {
+ context.disconnect(ResultCode.BUSY, "busy");
+ } else {
+ context.disconnect();
+ }
+ assertThat(context.isClosed()).isTrue();
+ }
+ // Block until remote close is signalled.
+ if (mockListener != null) {
+ // Block using listener.
+ mockListener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertThat(mockListener.getInvocationCount()).isEqualTo(1);
+ assertThat(mockListener.isDisconnectNotification()).isEqualTo(
+ config.sendDisconnectNotification);
+ assertThat(mockListener.getError()).isNotNull();
+ } else {
+ // Block by spinning on isValid.
+ waitForCondition(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return !client.isValid();
+ }
+ });
+ }
+ assertThat(client.isValid()).isFalse();
+ assertThat(client.isClosed()).isFalse();
+ } finally {
+ client.close();
+ }
+ // Check state after remote close and local close.
+ assertThat(client.isValid()).isFalse();
+ assertThat(client.isClosed()).isTrue();
+ if (mockListener != null) {
+ mockListener.awaitClose(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertThat(mockListener.getInvocationCount()).isEqualTo(2);
+ }
+ } finally {
+ listener.close();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testUnsolicitedNotifications() throws Exception {
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+ final AtomicReference<LDAPClientContext> contextHolder =
+ new AtomicReference<LDAPClientContext>();
+
+ final ServerConnectionFactory<LDAPClientContext, Integer> mockServer =
+ mock(ServerConnectionFactory.class);
+ when(mockServer.handleAccept(any(LDAPClientContext.class))).thenAnswer(
+ new Answer<ServerConnection<Integer>>() {
+
+ public ServerConnection<Integer> answer(InvocationOnMock invocation)
+ throws Throwable {
+ // Allow the context to be accessed from outside the mock.
+ contextHolder.set((LDAPClientContext) invocation.getArguments()[0]);
+ connectLatch.countDown(); /* is this needed? */
+ return mock(ServerConnection.class);
+ }
+ });
+
+ final int port = TestCaseUtils.findFreePort();
+ LDAPListener listener = new LDAPListener(port, mockServer);
+ try {
+ LDAPConnectionFactory clientFactory = new LDAPConnectionFactory("localhost", port);
+ final Connection client = clientFactory.getConnection();
+ connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
+ try {
+ MockConnectionEventListener mockListener = new MockConnectionEventListener();
+ client.addConnectionEventListener(mockListener);
+
+ // Send notification.
+ LDAPClientContext context = contextHolder.get();
+ assertThat(context).isNotNull();
+ context.sendUnsolicitedNotification(Responses.newGenericExtendedResult(
+ ResultCode.OTHER).setOID("1.2.3.4"));
+ assertThat(context.isClosed()).isFalse();
+
+ // Block using listener.
+ mockListener.awaitNotification(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertThat(mockListener.getInvocationCount()).isEqualTo(1);
+ assertThat(mockListener.getNotification()).isNotNull();
+ assertThat(mockListener.getNotification().getResultCode()).isEqualTo(
+ ResultCode.OTHER);
+ assertThat(mockListener.getNotification().getOID()).isEqualTo("1.2.3.4");
+ assertThat(client.isValid()).isTrue();
+ assertThat(client.isClosed()).isFalse();
+ } finally {
+ client.close();
+ }
+ } finally {
+ listener.close();
+ }
+ }
+
+ private void waitForCondition(Callable<Boolean> condition) throws Exception {
+ long timeout = System.currentTimeMillis() + TEST_TIMEOUT_MS;
+ while (!condition.call()) {
+ Thread.yield();
+ if (System.currentTimeMillis() > timeout) {
+ throw new TimeoutException("Test timed out after " + TEST_TIMEOUT + " seconds");
+ }
+ }
+ }
}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
index a8e13d2..1a9fc25 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
@@ -40,7 +40,6 @@
import org.forgerock.opendj.ldap.requests.ExtendedRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
-import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
@@ -60,30 +59,6 @@
*/
public class LDAPListenerTestCase extends SdkTestCase {
- private static class MockConnectionEventListener implements ConnectionEventListener {
- final CountDownLatch closeLatch = new CountDownLatch(1);
- String errorMessage = null;
-
- @Override
- public void handleConnectionClosed() {
- errorMessage = "Unexpected call to handleConnectionClosed";
- closeLatch.countDown();
- }
-
- @Override
- public void handleConnectionError(final boolean isDisconnectNotification,
- final ErrorResultException error) {
- errorMessage = "Unexpected call to handleConnectionError";
- closeLatch.countDown();
- }
-
- @Override
- public void handleUnsolicitedNotification(final ExtendedResult notification) {
- errorMessage = "Unexpected call to handleUnsolicitedNotification";
- closeLatch.countDown();
- }
- }
-
private static class MockServerConnection implements ServerConnection<Integer> {
volatile LDAPClientContext context = null;
final CountDownLatch isConnected = new CountDownLatch(1);
@@ -256,180 +231,6 @@
}
/**
- * Tests connection event listener.
- *
- * @throws Exception
- * If an unexpected error occurred.
- */
- @Test
- public void testConnectionEventListenerClose() throws Exception {
- final MockServerConnection onlineServerConnection = new MockServerConnection();
- final MockServerConnectionFactory onlineServerConnectionFactory =
- new MockServerConnectionFactory(onlineServerConnection);
- final LDAPListener onlineServerListener =
- new LDAPListener("localhost", TestCaseUtils.findFreePort(),
- onlineServerConnectionFactory);
-
- final Connection connection;
- try {
- // Connect and bind.
- connection =
- new LDAPConnectionFactory(onlineServerListener.getSocketAddress())
- .getConnection();
-
- final MockConnectionEventListener listener = new MockConnectionEventListener() {
-
- @Override
- public void handleConnectionClosed() {
- closeLatch.countDown();
- }
- };
-
- connection.addConnectionEventListener(listener);
- Assert.assertEquals(listener.closeLatch.getCount(), 1);
- connection.close();
- listener.closeLatch.await();
- Assert.assertNull(listener.errorMessage);
- } finally {
- onlineServerListener.close();
- }
- }
-
- /**
- * Tests connection event listener.
- *
- * @throws Exception
- * If an unexpected error occurred.
- */
- @Test(enabled = false)
- public void testConnectionEventListenerDisconnect() throws Exception {
- final MockServerConnection onlineServerConnection = new MockServerConnection();
- final MockServerConnectionFactory onlineServerConnectionFactory =
- new MockServerConnectionFactory(onlineServerConnection);
- final LDAPListener onlineServerListener =
- new LDAPListener("localhost", TestCaseUtils.findFreePort(),
- onlineServerConnectionFactory);
-
- final Connection connection;
- try {
- // Connect and bind.
- connection =
- new LDAPConnectionFactory(onlineServerListener.getSocketAddress())
- .getConnection();
-
- final MockConnectionEventListener listener = new MockConnectionEventListener() {
-
- @Override
- public void handleConnectionError(final boolean isDisconnectNotification,
- final ErrorResultException error) {
- if (isDisconnectNotification) {
- errorMessage = "Unexpected disconnect notification";
- }
- closeLatch.countDown();
- }
- };
-
- connection.addConnectionEventListener(listener);
- Assert.assertEquals(listener.closeLatch.getCount(), 1);
- Assert.assertTrue(onlineServerConnection.isConnected.await(10, TimeUnit.SECONDS));
- onlineServerConnection.context.disconnect();
- listener.closeLatch.await();
- Assert.assertNull(listener.errorMessage);
- connection.close();
- } finally {
- onlineServerListener.close();
- }
- }
-
- /**
- * Tests connection event listener.
- *
- * @throws Exception
- * If an unexpected error occurred.
- */
- @Test
- public void testConnectionEventListenerDisconnectNotification() throws Exception {
- final MockServerConnection onlineServerConnection = new MockServerConnection();
- final MockServerConnectionFactory onlineServerConnectionFactory =
- new MockServerConnectionFactory(onlineServerConnection);
- final LDAPListener onlineServerListener =
- new LDAPListener("localhost", TestCaseUtils.findFreePort(),
- onlineServerConnectionFactory);
-
- final Connection connection;
- try {
- // Connect and bind.
- connection =
- new LDAPConnectionFactory(onlineServerListener.getSocketAddress())
- .getConnection();
-
- final MockConnectionEventListener listener = new MockConnectionEventListener() {
-
- @Override
- public void handleConnectionError(final boolean isDisconnectNotification,
- final ErrorResultException error) {
- if (!isDisconnectNotification
- || !error.getResult().getResultCode().equals(ResultCode.BUSY)
- || !error.getResult().getDiagnosticMessage().equals("test")) {
- errorMessage = "Missing disconnect notification: " + error;
- }
- closeLatch.countDown();
- }
- };
-
- connection.addConnectionEventListener(listener);
- Assert.assertEquals(listener.closeLatch.getCount(), 1);
- Assert.assertTrue(onlineServerConnection.isConnected.await(10, TimeUnit.SECONDS));
- onlineServerConnection.context.disconnect(ResultCode.BUSY, "test");
- listener.closeLatch.await();
- Assert.assertNull(listener.errorMessage);
- connection.close();
- } finally {
- onlineServerListener.close();
- }
- }
-
- /**
- * Tests connection event listener.
- *
- * @throws Exception
- * If an unexpected error occurred.
- */
- @Test
- public void testConnectionEventListenerUnbind() throws Exception {
- final MockServerConnection onlineServerConnection = new MockServerConnection();
- final MockServerConnectionFactory onlineServerConnectionFactory =
- new MockServerConnectionFactory(onlineServerConnection);
- final LDAPListener onlineServerListener =
- new LDAPListener("localhost", TestCaseUtils.findFreePort(),
- onlineServerConnectionFactory);
-
- final Connection connection;
- try {
- // Connect and bind.
- connection =
- new LDAPConnectionFactory(onlineServerListener.getSocketAddress())
- .getConnection();
-
- final MockConnectionEventListener listener = new MockConnectionEventListener() {
-
- @Override
- public void handleConnectionClosed() {
- closeLatch.countDown();
- }
- };
-
- connection.addConnectionEventListener(listener);
- Assert.assertEquals(listener.closeLatch.getCount(), 1);
- connection.close(Requests.newUnbindRequest(), "called from unit test");
- listener.closeLatch.await();
- Assert.assertNull(listener.errorMessage);
- } finally {
- onlineServerListener.close();
- }
- }
-
- /**
* Tests basic LDAP listener functionality.
*
* @throws Exception
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockConnectionEventListener.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockConnectionEventListener.java
new file mode 100644
index 0000000..16e56a0
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockConnectionEventListener.java
@@ -0,0 +1,112 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2012 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+
+/**
+ * A connection event listener which records events and signals when it has been
+ * notified.
+ */
+final class MockConnectionEventListener implements ConnectionEventListener {
+ private final CountDownLatch closedLatch = new CountDownLatch(1);
+ private final CountDownLatch errorLatch = new CountDownLatch(1);
+ private final CountDownLatch notificationLatch = new CountDownLatch(1);
+ private Boolean isDisconnectNotification = null;
+ private ErrorResultException error = null;
+ private ExtendedResult notification = null;
+ private final AtomicInteger invocationCount = new AtomicInteger();
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleConnectionClosed() {
+ invocationCount.incrementAndGet();
+ closedLatch.countDown();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleConnectionError(boolean isDisconnectNotification, ErrorResultException error) {
+ this.isDisconnectNotification = isDisconnectNotification;
+ this.error = error;
+ invocationCount.incrementAndGet();
+ errorLatch.countDown();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleUnsolicitedNotification(ExtendedResult notification) {
+ this.notification = notification;
+ invocationCount.incrementAndGet();
+ notificationLatch.countDown();
+ }
+
+ void awaitClose(long timeout, TimeUnit unit) {
+ await(closedLatch, timeout, unit);
+ }
+
+ void awaitError(long timeout, TimeUnit unit) {
+ await(errorLatch, timeout, unit);
+ }
+
+ void awaitNotification(long timeout, TimeUnit unit) {
+ await(notificationLatch, timeout, unit);
+ }
+
+ Boolean isDisconnectNotification() {
+ return isDisconnectNotification;
+ }
+
+ ErrorResultException getError() {
+ return error;
+ }
+
+ ExtendedResult getNotification() {
+ return notification;
+ }
+
+ private void await(CountDownLatch latch, long timeout, TimeUnit unit) {
+ try {
+ latch.await(timeout, unit);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ int getInvocationCount() {
+ return invocationCount.get();
+ }
+}
--
Gitblit v1.10.0