From 7983705a0ce4de2fd6e7a4061fe4afa0f9e8c66a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 12 Sep 2012 20:46:39 +0000
Subject: [PATCH] First part of fix for OPENDJ-590: ConnectionPool may return already closed/disconnected connections

---
 opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java |  763 ++++++++++++++++++++++++++++------------------------------
 1 files changed, 364 insertions(+), 399 deletions(-)

diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index 5cbe12c..9d6fb88a 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -31,9 +31,9 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -83,22 +83,22 @@
 
 /**
  * LDAP connection implementation.
- * <p>
- * TODO: handle illegal state exceptions.
  */
 final class LDAPConnection extends AbstractAsynchronousConnection implements Connection {
-    private final org.glassfish.grizzly.Connection<?> connection;
-    private Result connectionInvalidReason;
-    private boolean isClosed = false;
-    private final List<ConnectionEventListener> listeners =
-            new CopyOnWriteArrayList<ConnectionEventListener>();
-    private final AtomicInteger nextMsgID = new AtomicInteger(1);
     private final AtomicBoolean bindOrStartTLSInProgress = new AtomicBoolean(false);
+    private final org.glassfish.grizzly.Connection<?> connection;
+    private final LDAPWriter ldapWriter = new LDAPWriter();
+    private final AtomicInteger nextMsgID = new AtomicInteger(1);
+    private final LDAPOptions options;
     private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests =
             new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>();
     private final Object stateLock = new Object();
-    private final LDAPWriter ldapWriter = new LDAPWriter();
-    private final LDAPOptions options;
+    // Guarded by stateLock
+    private Result connectionInvalidReason;
+    private boolean failedDueToDisconnect = false;
+    private boolean isClosed = false;
+    private boolean isFailed = false;
+    private List<ConnectionEventListener> listeners = null;
 
     /**
      * Creates a new LDAP connection.
@@ -116,60 +116,47 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<Void> abandonAsync(final AbandonRequest request) {
         final AbstractLDAPFutureResultImpl<?> pendingRequest;
         final int messageID = nextMsgID.getAndIncrement();
-
-        synchronized (stateLock) {
-            if (connectionInvalidReason != null) {
-                return new CompletedFutureResult<Void>(newErrorResult(connectionInvalidReason),
-                        messageID);
-            }
-            if (bindOrStartTLSInProgress.get()) {
-                final Result errorResult =
-                        Responses.newResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
-                                "Bind or Start TLS operation in progress");
-                return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID);
-            }
-
-            // First remove the future associated with the request to be
-            // abandoned.
-            pendingRequest = pendingRequests.remove(request.getRequestID());
-        }
-
-        if (pendingRequest == null) {
-            // There has never been a request with the specified message ID or
-            // the response has already been received and handled. We can ignore
-            // this abandon request.
-
-            // Message ID will be -1 since no request was sent.
-            return new CompletedFutureResult<Void>((Void) null);
-        }
-
-        pendingRequest.cancel(false);
-
         try {
-            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
-            try {
-                ldapWriter.abandonRequest(asn1Writer, messageID, request);
-                connection.write(asn1Writer.getBuffer(), null);
-                return new CompletedFutureResult<Void>((Void) null, messageID);
-            } finally {
-                asn1Writer.recycle();
+            synchronized (stateLock) {
+                checkConnectionIsValid();
+                checkBindOrStartTLSInProgress();
+                // Remove the future associated with the request to be abandoned.
+                pendingRequest = pendingRequests.remove(request.getRequestID());
             }
-        } catch (final IOException e) {
-            // FIXME: what other sort of IOExceptions can be thrown?
-            // FIXME: Is this the best result code?
-            final Result errorResult =
-                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
-            connectionErrorOccurred(errorResult);
-            return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID);
+            if (pendingRequest == null) {
+                // There has never been a request with the specified message ID or
+                // the response has already been received and handled. We can ignore
+                // this abandon request.
+
+                // Message ID will be -1 since no request was sent.
+                return new CompletedFutureResult<Void>((Void) null);
+            }
+            pendingRequest.cancel(false);
+            try {
+                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+                try {
+                    ldapWriter.abandonRequest(asn1Writer, messageID, request);
+                    connection.write(asn1Writer.getBuffer(), null);
+                    return new CompletedFutureResult<Void>((Void) null, messageID);
+                } finally {
+                    asn1Writer.recycle();
+                }
+            } catch (final IOException e) {
+                throw adaptRequestIOException(e);
+            }
+        } catch (final ErrorResultException e) {
+            return new CompletedFutureResult<Void>(e, messageID);
         }
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<Result> addAsync(final AddRequest request,
             final IntermediateResponseHandler intermediateResponseHandler,
             final ResultHandler<? super Result> resultHandler) {
@@ -177,59 +164,67 @@
         final LDAPFutureResultImpl future =
                 new LDAPFutureResultImpl(messageID, request, resultHandler,
                         intermediateResponseHandler, this);
-
-        synchronized (stateLock) {
-            if (connectionInvalidReason != null) {
-                future.adaptErrorResult(connectionInvalidReason);
-                return future;
-            }
-            if (bindOrStartTLSInProgress.get()) {
-                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
-                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
-                return future;
-            }
-            pendingRequests.put(messageID, future);
-        }
-
         try {
-            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
-            try {
-                ldapWriter.addRequest(asn1Writer, messageID, request);
-                connection.write(asn1Writer.getBuffer(), null);
-            } finally {
-                asn1Writer.recycle();
+            synchronized (stateLock) {
+                checkConnectionIsValid();
+                checkBindOrStartTLSInProgress();
+                pendingRequests.put(messageID, future);
             }
-        } catch (final IOException e) {
-            pendingRequests.remove(messageID);
-
-            // FIXME: what other sort of IOExceptions can be thrown?
-            // FIXME: Is this the best result code?
-            final Result errorResult =
-                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
-            connectionErrorOccurred(errorResult);
-            future.adaptErrorResult(errorResult);
+            try {
+                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+                try {
+                    ldapWriter.addRequest(asn1Writer, messageID, request);
+                    connection.write(asn1Writer.getBuffer(), null);
+                } finally {
+                    asn1Writer.recycle();
+                }
+            } catch (final IOException e) {
+                pendingRequests.remove(messageID);
+                throw adaptRequestIOException(e);
+            }
+        } catch (final ErrorResultException e) {
+            future.adaptErrorResult(e.getResult());
         }
-
         return future;
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public void addConnectionEventListener(final ConnectionEventListener listener) {
         Validator.ensureNotNull(listener);
-        listeners.add(listener);
+        final boolean notifyClose;
+        final boolean notifyErrorOccurred;
+        synchronized (stateLock) {
+            notifyClose = isClosed;
+            notifyErrorOccurred = isFailed;
+            if (!isClosed) {
+                if (listeners == null) {
+                    listeners = new LinkedList<ConnectionEventListener>();
+                }
+                listeners.add(listener);
+            }
+        }
+        if (notifyErrorOccurred) {
+            // Use the reason provided in the disconnect notification.
+            listener.handleConnectionError(failedDueToDisconnect,
+                    newErrorResult(connectionInvalidReason));
+        }
+        if (notifyClose) {
+            listener.handleConnectionClosed();
+        }
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<BindResult> bindAsync(final BindRequest request,
             final IntermediateResponseHandler intermediateResponseHandler,
             final ResultHandler<? super BindResult> resultHandler) {
         final int messageID = nextMsgID.getAndIncrement();
-
-        BindClient context;
+        final BindClient context;
         try {
             context =
                     request.createBindClient(
@@ -253,49 +248,41 @@
                 new LDAPBindFutureResultImpl(messageID, context, resultHandler,
                         intermediateResponseHandler, this);
 
-        synchronized (stateLock) {
-            if (connectionInvalidReason != null) {
-                future.adaptErrorResult(connectionInvalidReason);
-                return future;
-            }
-            if (!pendingRequests.isEmpty()) {
-                future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
-                        .setDiagnosticMessage(
-                                "There are other operations pending on this connection"));
-                return future;
-            }
-
-            if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
-                future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
-                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
-                return future;
-            }
-
-            pendingRequests.put(messageID, future);
-        }
-
         try {
-            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
-            try {
-                // Use the bind client to get the initial request instead of
-                // using the bind request passed to this method.
-                final GenericBindRequest initialRequest = context.nextBindRequest();
-                ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest);
-                connection.write(asn1Writer.getBuffer(), null);
-            } finally {
-                asn1Writer.recycle();
+            synchronized (stateLock) {
+                checkConnectionIsValid();
+                if (!pendingRequests.isEmpty()) {
+                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
+                            .setDiagnosticMessage(
+                                    "There are other operations pending on this connection"));
+                    return future;
+                }
+                if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
+                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
+                            .setDiagnosticMessage("Bind or Start TLS operation in progress"));
+                    return future;
+                }
+                pendingRequests.put(messageID, future);
             }
-        } catch (final IOException e) {
-            pendingRequests.remove(messageID);
 
-            bindOrStartTLSInProgress.set(false);
-
-            // FIXME: what other sort of IOExceptions can be thrown?
-            // FIXME: Is this the best result code?
-            final Result errorResult =
-                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
-            connectionErrorOccurred(errorResult);
-            future.adaptErrorResult(errorResult);
+            try {
+                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+                try {
+                    // Use the bind client to get the initial request instead of
+                    // using the bind request passed to this method.
+                    final GenericBindRequest initialRequest = context.nextBindRequest();
+                    ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest);
+                    connection.write(asn1Writer.getBuffer(), null);
+                } finally {
+                    asn1Writer.recycle();
+                }
+            } catch (final IOException e) {
+                pendingRequests.remove(messageID);
+                bindOrStartTLSInProgress.set(false);
+                throw adaptRequestIOException(e);
+            }
+        } catch (final ErrorResultException e) {
+            future.adaptErrorResult(e.getResult());
         }
 
         return future;
@@ -304,6 +291,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public void close(final UnbindRequest request, final String reason) {
         // FIXME: I18N need to internationalize this message.
         Validator.ensureNotNull(request);
@@ -316,6 +304,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<CompareResult> compareAsync(final CompareRequest request,
             final IntermediateResponseHandler intermediateResponseHandler,
             final ResultHandler<? super CompareResult> resultHandler) {
@@ -323,45 +312,34 @@
         final LDAPCompareFutureResultImpl future =
                 new LDAPCompareFutureResultImpl(messageID, request, resultHandler,
                         intermediateResponseHandler, this);
-
-        synchronized (stateLock) {
-            if (connectionInvalidReason != null) {
-                future.adaptErrorResult(connectionInvalidReason);
-                return future;
-            }
-            if (bindOrStartTLSInProgress.get()) {
-                future.setResultOrError(Responses.newCompareResult(ResultCode.OPERATIONS_ERROR)
-                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
-                return future;
-            }
-            pendingRequests.put(messageID, future);
-        }
-
         try {
-            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
-            try {
-                ldapWriter.compareRequest(asn1Writer, messageID, request);
-                connection.write(asn1Writer.getBuffer(), null);
-            } finally {
-                asn1Writer.recycle();
+            synchronized (stateLock) {
+                checkConnectionIsValid();
+                checkBindOrStartTLSInProgress();
+                pendingRequests.put(messageID, future);
             }
-        } catch (final IOException e) {
-            pendingRequests.remove(messageID);
-
-            // FIXME: what other sort of IOExceptions can be thrown?
-            // FIXME: Is this the best result code?
-            final Result errorResult =
-                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
-            connectionErrorOccurred(errorResult);
-            future.adaptErrorResult(errorResult);
+            try {
+                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+                try {
+                    ldapWriter.compareRequest(asn1Writer, messageID, request);
+                    connection.write(asn1Writer.getBuffer(), null);
+                } finally {
+                    asn1Writer.recycle();
+                }
+            } catch (final IOException e) {
+                pendingRequests.remove(messageID);
+                throw adaptRequestIOException(e);
+            }
+        } catch (final ErrorResultException e) {
+            future.adaptErrorResult(e.getResult());
         }
-
         return future;
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<Result> deleteAsync(final DeleteRequest request,
             final IntermediateResponseHandler intermediateResponseHandler,
             final ResultHandler<? super Result> resultHandler) {
@@ -369,45 +347,34 @@
         final LDAPFutureResultImpl future =
                 new LDAPFutureResultImpl(messageID, request, resultHandler,
                         intermediateResponseHandler, this);
-
-        synchronized (stateLock) {
-            if (connectionInvalidReason != null) {
-                future.adaptErrorResult(connectionInvalidReason);
-                return future;
-            }
-            if (bindOrStartTLSInProgress.get()) {
-                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
-                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
-                return future;
-            }
-            pendingRequests.put(messageID, future);
-        }
-
         try {
-            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
-            try {
-                ldapWriter.deleteRequest(asn1Writer, messageID, request);
-                connection.write(asn1Writer.getBuffer(), null);
-            } finally {
-                asn1Writer.recycle();
+            synchronized (stateLock) {
+                checkConnectionIsValid();
+                checkBindOrStartTLSInProgress();
+                pendingRequests.put(messageID, future);
             }
-        } catch (final IOException e) {
-            pendingRequests.remove(messageID);
-
-            // FIXME: what other sort of IOExceptions can be thrown?
-            // FIXME: Is this the best result code?
-            final Result errorResult =
-                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
-            connectionErrorOccurred(errorResult);
-            future.adaptErrorResult(errorResult);
+            try {
+                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+                try {
+                    ldapWriter.deleteRequest(asn1Writer, messageID, request);
+                    connection.write(asn1Writer.getBuffer(), null);
+                } finally {
+                    asn1Writer.recycle();
+                }
+            } catch (final IOException e) {
+                pendingRequests.remove(messageID);
+                throw adaptRequestIOException(e);
+            }
+        } catch (final ErrorResultException e) {
+            future.adaptErrorResult(e.getResult());
         }
-
         return future;
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
             final ExtendedRequest<R> request,
             final IntermediateResponseHandler intermediateResponseHandler,
@@ -416,68 +383,54 @@
         final LDAPExtendedFutureResultImpl<R> future =
                 new LDAPExtendedFutureResultImpl<R>(messageID, request, resultHandler,
                         intermediateResponseHandler, this);
-
-        synchronized (stateLock) {
-            if (connectionInvalidReason != null) {
-                future.adaptErrorResult(connectionInvalidReason);
-                return future;
-            }
-            if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
-                if (!pendingRequests.isEmpty()) {
-                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
-                            ResultCode.OPERATIONS_ERROR, "",
-                            "There are pending operations on this connection"));
-                    return future;
-                }
-                if (isTLSEnabled()) {
-                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
-                            ResultCode.OPERATIONS_ERROR, "",
-                            "This connection is already TLS enabled"));
-                    return future;
-                }
-                if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
-                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
-                            ResultCode.OPERATIONS_ERROR, "",
-                            "Bind or Start TLS operation in progress"));
-                    return future;
-                }
-            } else {
-                if (bindOrStartTLSInProgress.get()) {
-                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
-                            ResultCode.OPERATIONS_ERROR, "",
-                            "Bind or Start TLS operation in progress"));
-                    return future;
-                }
-            }
-            pendingRequests.put(messageID, future);
-        }
-
         try {
-            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
-            try {
-                ldapWriter.extendedRequest(asn1Writer, messageID, request);
-                connection.write(asn1Writer.getBuffer(), null);
-            } finally {
-                asn1Writer.recycle();
+            synchronized (stateLock) {
+                checkConnectionIsValid();
+                if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
+                    if (!pendingRequests.isEmpty()) {
+                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
+                                ResultCode.OPERATIONS_ERROR, "",
+                                "There are pending operations on this connection"));
+                        return future;
+                    } else if (isTLSEnabled()) {
+                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
+                                ResultCode.OPERATIONS_ERROR, "",
+                                "This connection is already TLS enabled"));
+                        return future;
+                    } else if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
+                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
+                                ResultCode.OPERATIONS_ERROR, "",
+                                "Bind or Start TLS operation in progress"));
+                        return future;
+                    }
+                } else {
+                    checkBindOrStartTLSInProgress();
+                }
+                pendingRequests.put(messageID, future);
             }
-        } catch (final IOException e) {
-            pendingRequests.remove(messageID);
-            bindOrStartTLSInProgress.set(false);
-
-            // FIXME: what other sort of IOExceptions can be thrown?
-            // FIXME: Is this the best result code?
-            final Result errorResult =
-                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
-            connectionErrorOccurred(errorResult);
-            future.adaptErrorResult(errorResult);
+            try {
+                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+                try {
+                    ldapWriter.extendedRequest(asn1Writer, messageID, request);
+                    connection.write(asn1Writer.getBuffer(), null);
+                } finally {
+                    asn1Writer.recycle();
+                }
+            } catch (final IOException e) {
+                pendingRequests.remove(messageID);
+                bindOrStartTLSInProgress.set(false);
+                throw adaptRequestIOException(e);
+            }
+        } catch (final ErrorResultException e) {
+            future.adaptErrorResult(e.getResult());
         }
-
         return future;
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public boolean isClosed() {
         synchronized (stateLock) {
             return isClosed;
@@ -487,15 +440,17 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public boolean isValid() {
         synchronized (stateLock) {
-            return connectionInvalidReason == null && !isClosed;
+            return isValid0();
         }
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<Result> modifyAsync(final ModifyRequest request,
             final IntermediateResponseHandler intermediateResponseHandler,
             final ResultHandler<? super Result> resultHandler) {
@@ -503,45 +458,34 @@
         final LDAPFutureResultImpl future =
                 new LDAPFutureResultImpl(messageID, request, resultHandler,
                         intermediateResponseHandler, this);
-
-        synchronized (stateLock) {
-            if (connectionInvalidReason != null) {
-                future.adaptErrorResult(connectionInvalidReason);
-                return future;
-            }
-            if (bindOrStartTLSInProgress.get()) {
-                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
-                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
-                return future;
-            }
-            pendingRequests.put(messageID, future);
-        }
-
         try {
-            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
-            try {
-                ldapWriter.modifyRequest(asn1Writer, messageID, request);
-                connection.write(asn1Writer.getBuffer(), null);
-            } finally {
-                asn1Writer.recycle();
+            synchronized (stateLock) {
+                checkConnectionIsValid();
+                checkBindOrStartTLSInProgress();
+                pendingRequests.put(messageID, future);
             }
-        } catch (final IOException e) {
-            pendingRequests.remove(messageID);
-
-            // FIXME: what other sort of IOExceptions can be thrown?
-            // FIXME: Is this the best result code?
-            final Result errorResult =
-                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
-            connectionErrorOccurred(errorResult);
-            future.adaptErrorResult(errorResult);
+            try {
+                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+                try {
+                    ldapWriter.modifyRequest(asn1Writer, messageID, request);
+                    connection.write(asn1Writer.getBuffer(), null);
+                } finally {
+                    asn1Writer.recycle();
+                }
+            } catch (final IOException e) {
+                pendingRequests.remove(messageID);
+                throw adaptRequestIOException(e);
+            }
+        } catch (final ErrorResultException e) {
+            future.adaptErrorResult(e.getResult());
         }
-
         return future;
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
             final IntermediateResponseHandler intermediateResponseHandler,
             final ResultHandler<? super Result> resultHandler) {
@@ -549,53 +493,47 @@
         final LDAPFutureResultImpl future =
                 new LDAPFutureResultImpl(messageID, request, resultHandler,
                         intermediateResponseHandler, this);
-
-        synchronized (stateLock) {
-            if (connectionInvalidReason != null) {
-                future.adaptErrorResult(connectionInvalidReason);
-                return future;
-            }
-            if (bindOrStartTLSInProgress.get()) {
-                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
-                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
-                return future;
-            }
-            pendingRequests.put(messageID, future);
-        }
-
         try {
-            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
-            try {
-                ldapWriter.modifyDNRequest(asn1Writer, messageID, request);
-                connection.write(asn1Writer.getBuffer(), null);
-            } finally {
-                asn1Writer.recycle();
+            synchronized (stateLock) {
+                checkConnectionIsValid();
+                checkBindOrStartTLSInProgress();
+                pendingRequests.put(messageID, future);
             }
-        } catch (final IOException e) {
-            pendingRequests.remove(messageID);
-
-            // FIXME: what other sort of IOExceptions can be thrown?
-            // FIXME: Is this the best result code?
-            final Result errorResult =
-                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
-            connectionErrorOccurred(errorResult);
-            future.adaptErrorResult(errorResult);
+            try {
+                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+                try {
+                    ldapWriter.modifyDNRequest(asn1Writer, messageID, request);
+                    connection.write(asn1Writer.getBuffer(), null);
+                } finally {
+                    asn1Writer.recycle();
+                }
+            } catch (final IOException e) {
+                pendingRequests.remove(messageID);
+                throw adaptRequestIOException(e);
+            }
+        } catch (final ErrorResultException e) {
+            future.adaptErrorResult(e.getResult());
         }
-
         return future;
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public void removeConnectionEventListener(final ConnectionEventListener listener) {
         Validator.ensureNotNull(listener);
-        listeners.remove(listener);
+        synchronized (stateLock) {
+            if (listeners != null) {
+                listeners.remove(listener);
+            }
+        }
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<Result> searchAsync(final SearchRequest request,
             final IntermediateResponseHandler intermediateResponseHandler,
             final SearchResultHandler resultHandler) {
@@ -603,47 +541,36 @@
         final LDAPSearchFutureResultImpl future =
                 new LDAPSearchFutureResultImpl(messageID, request, resultHandler,
                         intermediateResponseHandler, this);
-
-        synchronized (stateLock) {
-            if (connectionInvalidReason != null) {
-                future.adaptErrorResult(connectionInvalidReason);
-                return future;
-            }
-            if (bindOrStartTLSInProgress.get()) {
-                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
-                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
-                return future;
-            }
-            pendingRequests.put(messageID, future);
-        }
-
         try {
-            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
-            try {
-                ldapWriter.searchRequest(asn1Writer, messageID, request);
-                connection.write(asn1Writer.getBuffer(), null);
-            } finally {
-                asn1Writer.recycle();
+            synchronized (stateLock) {
+                checkConnectionIsValid();
+                checkBindOrStartTLSInProgress();
+                pendingRequests.put(messageID, future);
             }
-        } catch (final IOException e) {
-            pendingRequests.remove(messageID);
-
-            // FIXME: what other sort of IOExceptions can be thrown?
-            // FIXME: Is this the best result code?
-            final Result errorResult =
-                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
-            connectionErrorOccurred(errorResult);
-            future.adaptErrorResult(errorResult);
+            try {
+                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
+                try {
+                    ldapWriter.searchRequest(asn1Writer, messageID, request);
+                    connection.write(asn1Writer.getBuffer(), null);
+                } finally {
+                    asn1Writer.recycle();
+                }
+            } catch (final IOException e) {
+                pendingRequests.remove(messageID);
+                throw adaptRequestIOException(e);
+            }
+        } catch (final ErrorResultException e) {
+            future.adaptErrorResult(e.getResult());
         }
-
         return future;
     }
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public String toString() {
-        StringBuilder builder = new StringBuilder();
+        final StringBuilder builder = new StringBuilder();
         builder.append("LDAPConnection(");
         builder.append(connection.getLocalAddress());
         builder.append(',');
@@ -652,23 +579,11 @@
         return builder.toString();
     }
 
-    int continuePendingBindRequest(final LDAPBindFutureResultImpl future)
-            throws ErrorResultException {
-        final int newMsgID = nextMsgID.getAndIncrement();
-        synchronized (stateLock) {
-            if (connectionInvalidReason != null) {
-                throw newErrorResult(connectionInvalidReason);
-            }
-            pendingRequests.put(newMsgID, future);
-        }
-        return newMsgID;
-    }
-
     long cancelExpiredRequests(final long currentTime) {
         final long timeout = options.getTimeout(TimeUnit.MILLISECONDS);
         long delay = timeout;
         if (timeout > 0) {
-            for (int requestID : pendingRequests.keySet()) {
+            for (final int requestID : pendingRequests.keySet()) {
                 final AbstractLDAPFutureResultImpl<?> future = pendingRequests.get(requestID);
                 if (future != null) {
                     final long diff = (future.getTimestamp() + timeout) - currentTime;
@@ -687,52 +602,53 @@
         return delay;
     }
 
+    /**
+     * Closes this connection, invoking event listeners as needed.
+     *
+     * @param unbindRequest
+     *            The client provided unbind request if this is a client
+     *            initiated close, or {@code null} if the connection has failed.
+     * @param isDisconnectNotification
+     *            {@code true} if this is a connection failure signalled by a
+     *            server disconnect notification.
+     * @param reason
+     *            The result indicating why the connection was closed.
+     */
     void close(final UnbindRequest unbindRequest, final boolean isDisconnectNotification,
             final Result reason) {
-        boolean notifyClose = false;
-        boolean notifyErrorOccurred = false;
-
+        final boolean notifyClose;
+        final boolean notifyErrorOccurred;
+        final List<ConnectionEventListener> tmpListeners;
         synchronized (stateLock) {
             if (isClosed) {
-                // Already closed.
+                // Already closed locally.
                 return;
-            }
-
-            if (connectionInvalidReason != null) {
-                // Already closed.
-                isClosed = true;
-                return;
-            }
-
-            if (unbindRequest != null) {
-                // User closed.
-                isClosed = true;
+            } else if (unbindRequest != null) {
+                // Local close.
                 notifyClose = true;
+                notifyErrorOccurred = false;
+                isClosed = true;
+                tmpListeners = listeners;
+                listeners = null; // Prevent future invocations.
+                if (connectionInvalidReason == null) {
+                    connectionInvalidReason = reason;
+                }
+            } else if (isFailed) {
+                // Already failed.
+                return;
             } else {
+                // Connection has failed and this is the first indication.
+                notifyClose = false;
                 notifyErrorOccurred = true;
-            }
-
-            // Mark the connection as invalid.
-            if (!isDisconnectNotification) {
-                // Connection termination was detected locally, so use the
-                // provided
-                // reason for all subsequent requests.
+                isFailed = true;
+                failedDueToDisconnect = isDisconnectNotification;
                 connectionInvalidReason = reason;
-            } else {
-                // Connection termination was triggered remotely. We don't want
-                // to blindly pass on the result code to requests since it could
-                // be confused for a genuine response. For example, if the
-                // disconnect contained the invalidCredentials result code then
-                // this could be misinterpreted as a genuine authentication
-                // failure for subsequent bind requests.
-                connectionInvalidReason =
-                        Responses.newResult(ResultCode.CLIENT_SIDE_SERVER_DOWN)
-                                .setDiagnosticMessage("Connection closed by server");
+                tmpListeners = listeners; // Keep list for client close.
             }
         }
 
         // First abort all outstanding requests.
-        for (int requestID : pendingRequests.keySet()) {
+        for (final int requestID : pendingRequests.keySet()) {
             final AbstractLDAPFutureResultImpl<?> future = pendingRequests.remove(requestID);
             if (future != null) {
                 future.adaptErrorResult(connectionInvalidReason);
@@ -741,7 +657,7 @@
 
         // Now try cleanly closing the connection if possible.
         // Only send unbind if specified.
-        if (unbindRequest != null && !isDisconnectNotification) {
+        if (unbindRequest != null) {
             try {
                 final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                 try {
@@ -758,18 +674,29 @@
         connection.closeSilently();
 
         // Notify listeners.
-        if (notifyClose) {
-            for (final ConnectionEventListener listener : listeners) {
-                listener.handleConnectionClosed();
+        if (tmpListeners != null) {
+            if (notifyErrorOccurred) {
+                for (final ConnectionEventListener listener : tmpListeners) {
+                    // Use the reason provided in the disconnect notification.
+                    listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
+                }
+            }
+            if (notifyClose) {
+                for (final ConnectionEventListener listener : tmpListeners) {
+                    listener.handleConnectionClosed();
+                }
             }
         }
+    }
 
-        if (notifyErrorOccurred) {
-            for (final ConnectionEventListener listener : listeners) {
-                // Use the reason provided in the disconnect notification.
-                listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
-            }
+    int continuePendingBindRequest(final LDAPBindFutureResultImpl future)
+            throws ErrorResultException {
+        final int newMsgID = nextMsgID.getAndIncrement();
+        synchronized (stateLock) {
+            checkConnectionIsValid();
+            pendingRequests.put(newMsgID, future);
         }
+        return newMsgID;
     }
 
     LDAPOptions getLDAPOptions() {
@@ -781,13 +708,14 @@
     }
 
     void handleUnsolicitedNotification(final ExtendedResult result) {
-        if (isClosed()) {
-            // Don't notify after connection is closed.
-            return;
+        final List<ConnectionEventListener> tmpListeners;
+        synchronized (stateLock) {
+            tmpListeners = listeners;
         }
-
-        for (final ConnectionEventListener listener : listeners) {
-            listener.handleUnsolicitedNotification(result);
+        if (tmpListeners != null) {
+            for (final ConnectionEventListener listener : tmpListeners) {
+                listener.handleUnsolicitedNotification(result);
+            }
         }
     }
 
@@ -801,7 +729,7 @@
     void installFilter(final Filter filter) {
         synchronized (stateLock) {
             // Determine the index where the filter should be added.
-            FilterChain oldFilterChain = (FilterChain) connection.getProcessor();
+            final FilterChain oldFilterChain = (FilterChain) connection.getProcessor();
             int filterIndex = oldFilterChain.size() - 1;
             if (filter instanceof SSLFilter) {
                 // Beneath any ConnectionSecurityLayerFilters if present,
@@ -815,7 +743,7 @@
             }
 
             // Create the new filter chain.
-            FilterChain newFilterChain =
+            final FilterChain newFilterChain =
                     FilterChainBuilder.stateless().addAll(oldFilterChain).add(filterIndex, filter)
                             .build();
             connection.setProcessor(newFilterChain);
@@ -831,7 +759,7 @@
     boolean isTLSEnabled() {
         synchronized (stateLock) {
             final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
-            for (Filter filter : currentFilterChain) {
+            for (final Filter filter : currentFilterChain) {
                 if (filter instanceof SSLFilter) {
                     return true;
                 }
@@ -856,19 +784,56 @@
                 throw new IllegalStateException("TLS already enabled");
             }
 
-            SSLEngineConfigurator sslEngineConfigurator =
+            final SSLEngineConfigurator sslEngineConfigurator =
                     new SSLEngineConfigurator(sslContext, true, false, false);
             sslEngineConfigurator.setEnabledProtocols(protocols.isEmpty() ? null : protocols
                     .toArray(new String[protocols.size()]));
             sslEngineConfigurator.setEnabledCipherSuites(cipherSuites.isEmpty() ? null
                     : cipherSuites.toArray(new String[cipherSuites.size()]));
-            SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator);
+            final SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator);
             installFilter(sslFilter);
             sslFilter.handshake(connection, completionHandler);
         }
     }
 
+    private ErrorResultException adaptRequestIOException(final IOException e) {
+        // FIXME: what other sort of IOExceptions can be thrown?
+        // FIXME: Is this the best result code?
+        final Result errorResult =
+                Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
+        connectionErrorOccurred(errorResult);
+        return newErrorResult(errorResult);
+    }
+
+    private void checkBindOrStartTLSInProgress() throws ErrorResultException {
+        if (bindOrStartTLSInProgress.get()) {
+            throw newErrorResult(ResultCode.OPERATIONS_ERROR,
+                    "Bind or Start TLS operation in progress");
+        }
+    }
+
+    private void checkConnectionIsValid() throws ErrorResultException {
+        if (!isValid0()) {
+            if (failedDueToDisconnect) {
+                // Connection termination was triggered remotely. We don't want
+                // to blindly pass on the result code to requests since it could
+                // be confused for a genuine response. For example, if the
+                // disconnect contained the invalidCredentials result code then
+                // this could be misinterpreted as a genuine authentication
+                // failure for subsequent bind requests.
+                throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN,
+                        "Connection closed by server");
+            } else {
+                throw newErrorResult(connectionInvalidReason);
+            }
+        }
+    }
+
     private void connectionErrorOccurred(final Result reason) {
         close(null, false, reason);
     }
+
+    private boolean isValid0() {
+        return !isFailed && !isClosed;
+    }
 }

--
Gitblit v1.10.0