From 1fbc9df5d0c44ae72c76dacc3c945d4f0d641ee6 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 05:55:54 +0000
Subject: [PATCH] Initial implementation of fail over connection factory
---
sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java | 234 +++++++++++++++++++++++++++++-----------------------------
1 files changed, 116 insertions(+), 118 deletions(-)
diff --git a/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java b/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
index 5d91e23..9e9f8f1 100644
--- a/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
+++ b/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
@@ -37,9 +37,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
@@ -61,7 +59,7 @@
import com.sun.grizzly.ssl.*;
import com.sun.grizzly.streams.StreamWriter;
import com.sun.opends.sdk.util.Validator;
-
+import com.sun.opends.sdk.util.StaticUtils;
/**
@@ -666,6 +664,8 @@
private final Object writeLock = new Object();
+ private final SearchRequest pingRequest;
+
/**
* Creates a new LDAP connection.
*
@@ -676,18 +676,22 @@
* @param schema
* The schema which will be used to decode responses from the
* server.
+ * @param pingRequest
+ * The search request to use to verify the validity of
+ * this connection.
* @param connFactory
* The associated connection factory.
*/
LDAPConnection(com.sun.grizzly.Connection<?> connection,
InetSocketAddress serverAddress, Schema schema,
- LDAPConnectionFactoryImpl connFactory)
+ SearchRequest pingRequest, LDAPConnectionFactoryImpl connFactory)
{
this.connection = connection;
this.serverAddress = serverAddress;
this.schema = schema;
this.connFactory = connFactory;
this.streamWriter = getFilterChainStreamWriter();
+ this.pingRequest = pingRequest;
}
@@ -809,16 +813,7 @@
NullPointerException
{
Validator.ensureNotNull(listener);
-
- synchronized (writeLock)
- {
- if (isClosed)
- {
- throw new IllegalStateException();
- }
-
- listeners.add(listener);
- }
+ listeners.add(listener);
}
@@ -1258,11 +1253,7 @@
ConnectionEventListener listener) throws NullPointerException
{
Validator.ensureNotNull(listener);
-
- synchronized (writeLock)
- {
- listeners.remove(listener);
- }
+ listeners.remove(listener);
}
@@ -1354,13 +1345,17 @@
private void close(UnbindRequest unbindRequest,
- boolean isDisconnectNotification, Result reason)
+ boolean isDisconnectNotification, Result reason)
{
+ boolean notifyClose = false;
+ boolean notifyErrorOccurred = false;
+
synchronized (writeLock)
{
- boolean notifyClose = false;
- boolean notifyErrorOccurred = false;
-
+ if(reason == null && unbindRequest == null)
+ {
+ System.out.println("Something is wrong!");
+ }
if (isClosed)
{
// Already closed.
@@ -1381,112 +1376,104 @@
if (connectionInvalidReason != null)
{
// Already invalid.
- if (notifyClose)
- {
- // TODO: uncomment if close notification is required.
- // for (ConnectionEventListener listener : listeners)
- // {
- // listener.connectionClosed(this);
- // }
- }
return;
}
- // First abort all outstanding requests.
- for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
- .values())
- {
- if (pendingBindOrStartTLS <= 0)
- {
- ASN1StreamWriter asn1Writer = connFactory
- .getASN1Writer(streamWriter);
- int messageID = nextMsgID.getAndIncrement();
- AbandonRequest abandon = Requests.newAbandonRequest(future
- .getRequestID());
- try
- {
- LDAPEncoder.encodeAbandonRequest(asn1Writer, messageID,
- abandon);
- asn1Writer.flush();
- }
- catch (IOException e)
- {
- // Underlying channel probably blown up. Just ignore.
- }
- finally
- {
- connFactory.releaseASN1Writer(asn1Writer);
- }
- }
+ // Mark the connection as invalid.
+ connectionInvalidReason = reason;
+ }
- future.adaptErrorResult(reason);
- }
- pendingRequests.clear();
-
- // Now try cleanly closing the connection if possible.
- try
+ // First abort all outstanding requests.
+ for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
+ .values())
+ {
+ if (pendingBindOrStartTLS <= 0)
{
ASN1StreamWriter asn1Writer = connFactory
.getASN1Writer(streamWriter);
- if (unbindRequest == null)
- {
- unbindRequest = Requests.newUnbindRequest();
- }
-
+ int messageID = nextMsgID.getAndIncrement();
+ AbandonRequest abandon = Requests.newAbandonRequest(future
+ .getRequestID());
try
{
- LDAPEncoder.encodeUnbindRequest(asn1Writer, nextMsgID
- .getAndIncrement(), unbindRequest);
+ LDAPEncoder.encodeAbandonRequest(asn1Writer, messageID,
+ abandon);
asn1Writer.flush();
}
+ catch (IOException e)
+ {
+ // Underlying channel probably blown up. Just ignore.
+ }
finally
{
connFactory.releaseASN1Writer(asn1Writer);
}
}
- catch (IOException e)
+
+ future.adaptErrorResult(reason);
+ }
+ pendingRequests.clear();
+
+ // Now try cleanly closing the connection if possible.
+ try
+ {
+ ASN1StreamWriter asn1Writer = connFactory
+ .getASN1Writer(streamWriter);
+ if (unbindRequest == null)
{
- // Underlying channel prob blown up. Just ignore.
+ unbindRequest = Requests.newUnbindRequest();
}
try
{
- streamWriter.close();
+ LDAPEncoder.encodeUnbindRequest(asn1Writer, nextMsgID
+ .getAndIncrement(), unbindRequest);
+ asn1Writer.flush();
}
- catch (IOException e)
+ finally
{
- // Ignore.
+ connFactory.releaseASN1Writer(asn1Writer);
}
+ }
+ catch (IOException e)
+ {
+ // Underlying channel prob blown up. Just ignore.
+ }
- try
- {
- connection.close();
- }
- catch (IOException e)
- {
- // Ignore.
- }
+ try
+ {
+ streamWriter.close();
+ }
+ catch (IOException e)
+ {
+ // Ignore.
+ }
- // Mark the connection as invalid.
- connectionInvalidReason = reason;
+ try
+ {
+ connection.close();
+ }
+ catch (IOException e)
+ {
+ // Ignore.
+ }
- // Notify listeners.
- if (notifyClose)
- {
- // TODO: uncomment if close notification is required.
- // for (ConnectionEventListener listener : listeners)
- // {
- // listener.connectionClosed(this);
- // }
- }
+ // Notify listeners.
+ if (notifyClose)
+ {
+ // TODO: uncomment if close notification is required.
+ // for (ConnectionEventListener listener : listeners)
+ // {
+ // listener.connectionClosed(this);
+ // }
+ }
- if (notifyErrorOccurred)
+ if (notifyErrorOccurred)
+ {
+ for (ConnectionEventListener listener : listeners)
{
- for (ConnectionEventListener listener : listeners)
- {
- listener.connectionErrorOccurred(isDisconnectNotification,
- ErrorResultException.wrap(reason));
- }
+ listener.connectionErrorOccurred(isDisconnectNotification,
+ ErrorResultException.wrap(reason));
}
}
}
@@ -1506,27 +1493,38 @@
*/
public boolean isClosed()
{
- synchronized (writeLock)
- {
- return isClosed;
- }
+ return isClosed;
}
- //
- //
- //
- // /**
- // * {@inheritDoc}
- // */
- // public boolean isValid() throws InterruptedException
- // {
- // synchronized (writeLock)
- // {
- // return connectionInvalidReason == null;
- // }
- // }
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isValid()
+ {
+ if(!isClosed && connectionInvalidReason == null)
+ {
+ try
+ {
+ search(pingRequest, null, null).get(5, TimeUnit.SECONDS);
+ return true;
+ }
+ catch (ErrorResultException e)
+ {
+ StaticUtils.DEBUG_LOG.warning("Validity check returned error: " +
+ e.getResult());
+ }
+ catch (TimeoutException e)
+ {
+ StaticUtils.DEBUG_LOG.warning("Validity check timed out");
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ return false;
+ }
//
//
//
--
Gitblit v1.10.0