From 2db38261424c5ee69d5d59ad15258829359c76ef Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 18 Apr 2013 11:37:28 +0000
Subject: [PATCH] Fix OPENDJ-838: Add ConnectionFactory.close() method to facilitate resource cleanup after application exit
---
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionFactory.java | 27
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java | 23
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java | 107 +--
opendj-sdk/opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPAuthnFilter.java | 16
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connection.java | 3
opendj-sdk/opendj3/opendj-server2x-adapter/src/main/java/org/forgerock/opendj/adapter/server2x/Adapters.java | 5
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java | 65 +
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java | 6
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java | 28
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransport.java | 222 ++++---
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractConnectionWrapper.java | 9
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/ReferenceCountedObject.java | 162 +++++
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/InternalConnectionFactory.java | 18
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPListenerImpl.java | 45
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java | 201 +++---
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java | 152 ++---
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java | 53 +
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/StaticUtils.java | 44
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java | 13
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java | 40 +
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java | 93 --
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java | 1
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AuthenticatedConnectionFactory.java | 51 -
opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransportTestCase.java | 11
opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/ReferenceCountedObjectTestCase.java | 151 +++++
opendj-sdk/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthenticatedConnectionFactory.java | 52 -
opendj-sdk/opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPConnectionFactoryProvider.java | 40 +
27 files changed, 987 insertions(+), 651 deletions(-)
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransport.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransport.java
index 9366df8..9f7c413 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransport.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransport.java
@@ -22,133 +22,139 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package com.forgerock.opendj.ldap;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+
import java.io.IOException;
+import java.util.logging.Level;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
import org.glassfish.grizzly.strategies.WorkerThreadIOStrategy;
+import com.forgerock.opendj.util.ReferenceCountedObject;
+
/**
* The default {@link TCPNIOTransport} which all {@code LDAPConnectionFactory}s
* and {@code LDAPListener}s will use unless otherwise specified in their
* options.
*/
-final class DefaultTCPNIOTransport {
- private static TCPNIOTransport defaultTransport = null;
-
- /**
- * Returns the default {@link TCPNIOTransport} which all
- * {@code LDAPConnectionFactory}s and {@code LDAPListener}s will use unless
- * otherwise specified in their options.
- *
- * @return The default {@link TCPNIOTransport}.
- */
- static synchronized TCPNIOTransport getInstance() {
- if (defaultTransport == null) {
- final TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
-
- // Determine which threading strategy to use, and total number of
- // threads.
- final String useWorkerThreadsStr =
- System.getProperty("org.forgerock.opendj.transport.useWorkerThreads");
- final boolean useWorkerThreadStrategy;
- if (useWorkerThreadsStr != null) {
- useWorkerThreadStrategy = Boolean.parseBoolean(useWorkerThreadsStr);
- } else {
- // The most best performing strategy to use is the
- // SameThreadIOStrategy, however it can only be used in cases
- // where result listeners will not block.
- useWorkerThreadStrategy = true;
- }
-
- if (useWorkerThreadStrategy) {
- builder.setIOStrategy(WorkerThreadIOStrategy.getInstance());
- } else {
- builder.setIOStrategy(SameThreadIOStrategy.getInstance());
- }
-
- // Calculate thread counts.
- final int cpus = Runtime.getRuntime().availableProcessors();
-
- // Calculate the number of selector threads.
- final String selectorsStr =
- System.getProperty("org.forgerock.opendj.transport.selectors");
- final int selectorThreadCount;
-
- if (selectorsStr != null) {
- selectorThreadCount = Integer.parseInt(selectorsStr);
- } else {
- selectorThreadCount =
- useWorkerThreadStrategy ? Math.max(2, cpus / 4) : Math.max(5,
- (cpus / 2) - 1);
- }
-
- builder.getSelectorThreadPoolConfig().setCorePoolSize(selectorThreadCount)
- .setMaxPoolSize(selectorThreadCount).setPoolName(
- "OpenDJ LDAP SDK Grizzly selector thread");
-
- // Calculate the number of worker threads.
- if (builder.getWorkerThreadPoolConfig() != null) {
- final String workersStr =
- System.getProperty("org.forgerock.opendj.transport.workers");
- final int workerThreadCount;
-
- if (workersStr != null) {
- workerThreadCount = Integer.parseInt(workersStr);
- } else {
- workerThreadCount = useWorkerThreadStrategy ? Math.max(5, (cpus * 2)) : 0;
- }
-
- builder.getWorkerThreadPoolConfig().setCorePoolSize(workerThreadCount)
- .setMaxPoolSize(workerThreadCount).setPoolName(
- "OpenDJ LDAP SDK Grizzly worker thread");
- }
-
- // Parse IO related options.
- final String lingerStr = System.getProperty("org.forgerock.opendj.transport.linger");
- if (lingerStr != null) {
- // Disabled by default.
- builder.setLinger(Integer.parseInt(lingerStr));
- }
-
- final String tcpNoDelayStr =
- System.getProperty("org.forgerock.opendj.transport.tcpNoDelay");
- if (tcpNoDelayStr != null) {
- // Enabled by default.
- builder.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelayStr));
- }
-
- final String reuseAddressStr =
- System.getProperty("org.forgerock.opendj.transport.reuseAddress");
- if (reuseAddressStr != null) {
- // Enabled by default.
- builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
- }
-
- defaultTransport = builder.build();
-
- // FIXME: raise bug in Grizzly. We should not need to do this, but
- // failure to do so causes many deadlocks.
- defaultTransport.setSelectorRunnersCount(selectorThreadCount);
-
- try {
- defaultTransport.start();
- } catch (final IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- return defaultTransport;
- }
+final class DefaultTCPNIOTransport extends ReferenceCountedObject<TCPNIOTransport> {
+ static final DefaultTCPNIOTransport DEFAULT_TRANSPORT = new DefaultTCPNIOTransport();
private DefaultTCPNIOTransport() {
// Prevent instantiation.
}
+ @Override
+ protected void destroyInstance(final TCPNIOTransport instance) {
+ try {
+ instance.stop();
+ } catch (final IOException e) {
+ DEBUG_LOG.log(Level.WARNING,
+ "An error occurred while shutting down the Grizzly transport", e.getMessage());
+ }
+ }
+
+ @Override
+ protected TCPNIOTransport newInstance() {
+ final TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
+
+ /*
+ * Determine which threading strategy to use, and total number of
+ * threads.
+ */
+ final String useWorkerThreadsStr =
+ System.getProperty("org.forgerock.opendj.transport.useWorkerThreads");
+ final boolean useWorkerThreadStrategy;
+ if (useWorkerThreadsStr != null) {
+ useWorkerThreadStrategy = Boolean.parseBoolean(useWorkerThreadsStr);
+ } else {
+ /*
+ * The most best performing strategy to use is the
+ * SameThreadIOStrategy, however it can only be used in cases where
+ * result listeners will not block.
+ */
+ useWorkerThreadStrategy = true;
+ }
+
+ if (useWorkerThreadStrategy) {
+ builder.setIOStrategy(WorkerThreadIOStrategy.getInstance());
+ } else {
+ builder.setIOStrategy(SameThreadIOStrategy.getInstance());
+ }
+
+ // Calculate thread counts.
+ final int cpus = Runtime.getRuntime().availableProcessors();
+
+ // Calculate the number of selector threads.
+ final String selectorsStr = System.getProperty("org.forgerock.opendj.transport.selectors");
+ final int selectorThreadCount;
+
+ if (selectorsStr != null) {
+ selectorThreadCount = Integer.parseInt(selectorsStr);
+ } else {
+ selectorThreadCount =
+ useWorkerThreadStrategy ? Math.max(2, cpus / 4) : Math.max(5, (cpus / 2) - 1);
+ }
+
+ builder.getSelectorThreadPoolConfig().setCorePoolSize(selectorThreadCount).setMaxPoolSize(
+ selectorThreadCount).setPoolName("OpenDJ LDAP SDK Grizzly selector thread");
+
+ // Calculate the number of worker threads.
+ if (builder.getWorkerThreadPoolConfig() != null) {
+ final String workersStr = System.getProperty("org.forgerock.opendj.transport.workers");
+ final int workerThreadCount;
+
+ if (workersStr != null) {
+ workerThreadCount = Integer.parseInt(workersStr);
+ } else {
+ workerThreadCount = useWorkerThreadStrategy ? Math.max(5, (cpus * 2)) : 0;
+ }
+
+ builder.getWorkerThreadPoolConfig().setCorePoolSize(workerThreadCount).setMaxPoolSize(
+ workerThreadCount).setPoolName("OpenDJ LDAP SDK Grizzly worker thread");
+ }
+
+ // Parse IO related options.
+ final String lingerStr = System.getProperty("org.forgerock.opendj.transport.linger");
+ if (lingerStr != null) {
+ // Disabled by default.
+ builder.setLinger(Integer.parseInt(lingerStr));
+ }
+
+ final String tcpNoDelayStr =
+ System.getProperty("org.forgerock.opendj.transport.tcpNoDelay");
+ if (tcpNoDelayStr != null) {
+ // Enabled by default.
+ builder.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelayStr));
+ }
+
+ final String reuseAddressStr =
+ System.getProperty("org.forgerock.opendj.transport.reuseAddress");
+ if (reuseAddressStr != null) {
+ // Enabled by default.
+ builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
+ }
+
+ final TCPNIOTransport transport = builder.build();
+
+ // FIXME: raise bug in Grizzly. We should not need to do this, but
+ // failure to do so causes many deadlocks.
+ transport.setSelectorRunnersCount(selectorThreadCount);
+
+ try {
+ transport.start();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return transport;
+ }
+
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java
index 32cb50d..f540a8d 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java
@@ -513,7 +513,6 @@
}
void registerConnection(final Connection<?> connection, final LDAPConnection ldapConnection) {
- TimeoutChecker.INSTANCE.addConnection(ldapConnection);
LDAP_CONNECTION_ATTR.set(connection, ldapConnection);
}
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index bb5c88a..dc20e65 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -27,6 +27,7 @@
package com.forgerock.opendj.ldap;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.io.IOException;
@@ -81,7 +82,6 @@
import org.glassfish.grizzly.ssl.SSLFilter;
import com.forgerock.opendj.util.CompletedFutureResult;
-import com.forgerock.opendj.util.StaticUtils;
import com.forgerock.opendj.util.Validator;
/**
@@ -109,7 +109,7 @@
private final org.glassfish.grizzly.Connection<?> connection;
private final LDAPWriter ldapWriter = new LDAPWriter();
private final AtomicInteger nextMsgID = new AtomicInteger(1);
- private final LDAPOptions options;
+ private final LDAPConnectionFactoryImpl factory;
private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests =
new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>();
private final Object stateLock = new Object();
@@ -120,22 +120,12 @@
private boolean isFailed = false;
private List<ConnectionEventListener> listeners = null;
- /**
- * Creates a new LDAP connection.
- *
- * @param connection
- * The Grizzly connection.
- * @param options
- * The LDAP client options.
- */
- LDAPConnection(final org.glassfish.grizzly.Connection<?> connection, final LDAPOptions options) {
+ LDAPConnection(final org.glassfish.grizzly.Connection<?> connection,
+ final LDAPConnectionFactoryImpl factory) {
this.connection = connection;
- this.options = options;
+ this.factory = factory;
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Void> abandonAsync(final AbandonRequest request) {
final AbstractLDAPFutureResultImpl<?> pendingRequest;
@@ -148,9 +138,11 @@
pendingRequest = pendingRequests.remove(request.getRequestID());
}
if (pendingRequest == null) {
- // There has never been a request with the specified message ID or
- // the response has already been received and handled. We can ignore
- // this abandon request.
+ /*
+ * There has never been a request with the specified message ID
+ * or the response has already been received and handled. We can
+ * ignore this abandon request.
+ */
// Message ID will be -1 since no request was sent.
return new CompletedFutureResult<Void>((Void) null);
@@ -173,9 +165,6 @@
}
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Result> addAsync(final AddRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
@@ -208,9 +197,6 @@
return future;
}
- /**
- * {@inheritDoc}
- */
@Override
public void addConnectionEventListener(final ConnectionEventListener listener) {
Validator.ensureNotNull(listener);
@@ -236,9 +222,6 @@
}
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<BindResult> bindAsync(final BindRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
@@ -308,9 +291,6 @@
return future;
}
- /**
- * {@inheritDoc}
- */
@Override
public void close(final UnbindRequest request, final String reason) {
// FIXME: I18N need to internationalize this message.
@@ -321,9 +301,6 @@
"Connection closed by client" + (reason != null ? ": " + reason : "")));
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<CompareResult> compareAsync(final CompareRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
@@ -356,9 +333,6 @@
return future;
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Result> deleteAsync(final DeleteRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
@@ -391,9 +365,6 @@
return future;
}
- /**
- * {@inheritDoc}
- */
@Override
public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
final ExtendedRequest<R> request,
@@ -447,9 +418,6 @@
return future;
}
- /**
- * {@inheritDoc}
- */
@Override
public boolean isClosed() {
synchronized (stateLock) {
@@ -457,9 +425,6 @@
}
}
- /**
- * {@inheritDoc}
- */
@Override
public boolean isValid() {
synchronized (stateLock) {
@@ -467,9 +432,6 @@
}
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Result> modifyAsync(final ModifyRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
@@ -502,9 +464,6 @@
return future;
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
@@ -537,9 +496,6 @@
return future;
}
- /**
- * {@inheritDoc}
- */
@Override
public void removeConnectionEventListener(final ConnectionEventListener listener) {
Validator.ensureNotNull(listener);
@@ -550,9 +506,6 @@
}
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Result> searchAsync(final SearchRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
@@ -585,9 +538,6 @@
return future;
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
@@ -600,7 +550,7 @@
}
long cancelExpiredRequests(final long currentTime) {
- final long timeout = options.getTimeout(TimeUnit.MILLISECONDS);
+ final long timeout = factory.getLDAPOptions().getTimeout(TimeUnit.MILLISECONDS);
long delay = timeout;
if (timeout > 0) {
for (final int requestID : pendingRequests.keySet()) {
@@ -608,10 +558,9 @@
if (future != null) {
final long diff = (future.getTimestamp() + timeout) - currentTime;
if (diff <= 0 && pendingRequests.remove(requestID) != null) {
- StaticUtils.DEBUG_LOG.fine("Cancelling expired future result: " + future);
+ DEBUG_LOG.fine("Cancelling expired future result: " + future);
final Result result = Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT);
future.adaptErrorResult(result);
-
abandonAsync(Requests.newAbandonRequest(future.getRequestID()));
} else {
delay = Math.min(delay, diff);
@@ -691,7 +640,7 @@
// Underlying channel prob blown up. Just ignore.
}
}
- TimeoutChecker.INSTANCE.removeConnection(this);
+ factory.getTimeoutChecker().removeConnection(this);
connection.closeSilently();
// Notify listeners.
@@ -721,7 +670,7 @@
}
LDAPOptions getLDAPOptions() {
- return options;
+ return factory.getLDAPOptions();
}
AbstractLDAPFutureResultImpl<?> getPendingRequest(final Integer messageID) {
@@ -837,12 +786,14 @@
private void checkConnectionIsValid() throws ErrorResultException {
if (!isValid0()) {
if (failedDueToDisconnect) {
- // Connection termination was triggered remotely. We don't want
- // to blindly pass on the result code to requests since it could
- // be confused for a genuine response. For example, if the
- // disconnect contained the invalidCredentials result code then
- // this could be misinterpreted as a genuine authentication
- // failure for subsequent bind requests.
+ /*
+ * Connection termination was triggered remotely. We don't want
+ * to blindly pass on the result code to requests since it could
+ * be confused for a genuine response. For example, if the
+ * disconnect contained the invalidCredentials result code then
+ * this could be misinterpreted as a genuine authentication
+ * failure for subsequent bind requests.
+ */
throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN,
"Connection closed by server");
} else {
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
index 706e9b3..9a60962 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -27,11 +27,14 @@
package com.forgerock.opendj.ldap;
+import static com.forgerock.opendj.ldap.DefaultTCPNIOTransport.DEFAULT_TRANSPORT;
+import static com.forgerock.opendj.ldap.TimeoutChecker.TIMEOUT_CHECKER;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine;
@@ -55,6 +58,7 @@
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import com.forgerock.opendj.util.AsynchronousFutureResult;
+import com.forgerock.opendj.util.ReferenceCountedObject;
/**
* LDAP connection factory implementation.
@@ -154,10 +158,14 @@
}
private LDAPConnection adaptConnection(final org.glassfish.grizzly.Connection<?> connection) {
- // Test shows that its much faster with non block writes but risk
- // running out of memory if the server is slow.
+ /*
+ * Test shows that its much faster with non block writes but risk
+ * running out of memory if the server is slow.
+ */
connection.configureBlocking(true);
- final LDAPConnection ldapConnection = new LDAPConnection(connection, options);
+ final LDAPConnection ldapConnection =
+ new LDAPConnection(connection, LDAPConnectionFactoryImpl.this);
+ timeoutChecker.get().addConnection(ldapConnection);
clientFilter.registerConnection(connection, ldapConnection);
return ldapConnection;
}
@@ -194,7 +202,10 @@
private final FilterChain defaultFilterChain;
private final LDAPOptions options;
private final SocketAddress socketAddress;
- private final TCPNIOTransport transport;
+ private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
+ private final AtomicBoolean isClosed = new AtomicBoolean();
+ private final ReferenceCountedObject<TimeoutChecker>.Reference timeoutChecker = TIMEOUT_CHECKER
+ .acquire();
/**
* Creates a new LDAP connection factory implementation which can be used to
@@ -207,11 +218,7 @@
* The LDAP connection options to use when creating connections.
*/
public LDAPConnectionFactoryImpl(final SocketAddress address, final LDAPOptions options) {
- if (options.getTCPNIOTransport() == null) {
- this.transport = DefaultTCPNIOTransport.getInstance();
- } else {
- this.transport = options.getTCPNIOTransport();
- }
+ this.transport = DEFAULT_TRANSPORT.acquireIfNull(options.getTCPNIOTransport());
this.socketAddress = address;
this.options = new LDAPOptions(options);
this.clientFilter =
@@ -220,9 +227,14 @@
FilterChainBuilder.stateless().add(new TransportFilter()).add(clientFilter).build();
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ if (isClosed.compareAndSet(false, true)) {
+ transport.release();
+ timeoutChecker.release();
+ }
+ }
+
@Override
public Connection getConnection() throws ErrorResultException {
try {
@@ -232,14 +244,12 @@
}
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
final SocketConnectorHandler connectorHandler =
- TCPNIOConnectorHandler.builder(transport).processor(defaultFilterChain).build();
+ TCPNIOConnectorHandler.builder(transport.get()).processor(defaultFilterChain)
+ .build();
final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future =
new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(handler);
final CompletionHandlerAdapter cha = new CompletionHandlerAdapter(future);
@@ -256,9 +266,14 @@
return socketAddress;
}
- /**
- * {@inheritDoc}
- */
+ TimeoutChecker getTimeoutChecker() {
+ return timeoutChecker.get();
+ }
+
+ LDAPOptions getLDAPOptions() {
+ return options;
+ }
+
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPListenerImpl.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPListenerImpl.java
index 2091060..34ae160 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPListenerImpl.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPListenerImpl.java
@@ -22,14 +22,18 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package com.forgerock.opendj.ldap;
+import static com.forgerock.opendj.ldap.DefaultTCPNIOTransport.DEFAULT_TRANSPORT;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.forgerock.opendj.ldap.DecodeOptions;
@@ -43,16 +47,17 @@
import org.glassfish.grizzly.nio.transport.TCPNIOServerConnection;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
-import com.forgerock.opendj.util.StaticUtils;
+import com.forgerock.opendj.util.ReferenceCountedObject;
/**
* LDAP listener implementation.
*/
public final class LDAPListenerImpl implements Closeable {
- private final TCPNIOTransport transport;
+ private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
private final FilterChain defaultFilterChain;
private final ServerConnectionFactory<LDAPClientContext, Integer> connectionFactory;
private final TCPNIOServerConnection serverConnection;
+ private final AtomicBoolean isClosed = new AtomicBoolean();
/**
* Creates a new LDAP listener implementation which will listen for LDAP
@@ -72,11 +77,7 @@
public LDAPListenerImpl(final SocketAddress address,
final ServerConnectionFactory<LDAPClientContext, Integer> factory,
final LDAPListenerOptions options) throws IOException {
- if (options.getTCPNIOTransport() == null) {
- this.transport = DefaultTCPNIOTransport.getInstance();
- } else {
- this.transport = options.getTCPNIOTransport();
- }
+ this.transport = DEFAULT_TRANSPORT.acquireIfNull(options.getTCPNIOTransport());
this.connectionFactory = factory;
final DecodeOptions decodeOptions = new DecodeOptions(options.getDecodeOptions());
@@ -85,26 +86,22 @@
new LDAPServerFilter(this, new LDAPReader(decodeOptions), options
.getMaxRequestSize())).build();
final TCPNIOBindingHandler bindingHandler =
- TCPNIOBindingHandler.builder(transport).processor(defaultFilterChain).build();
+ TCPNIOBindingHandler.builder(transport.get()).processor(defaultFilterChain).build();
this.serverConnection = bindingHandler.bind(address, options.getBacklog());
}
- /**
- * {@inheritDoc}
- */
@Override
public void close() {
- try {
- serverConnection.close().get();
- } catch (final InterruptedException e) {
- // Cannot handle here.
- Thread.currentThread().interrupt();
- } catch (final Exception e) {
- // Ignore the exception.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) {
- StaticUtils.DEBUG_LOG.log(Level.WARNING,
- "Exception occurred while closing listener:" + e.getMessage(), e);
+ if (isClosed.compareAndSet(false, true)) {
+ try {
+ serverConnection.close().get();
+ } catch (final InterruptedException e) {
+ // Cannot handle here.
+ Thread.currentThread().interrupt();
+ } catch (final Exception e) {
+ DEBUG_LOG.log(Level.WARNING, "Exception occurred while closing listener", e);
}
+ transport.release();
}
}
@@ -117,9 +114,7 @@
return serverConnection.getLocalAddress();
}
- /**
- * {@inheritDoc}
- */
+ @Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("LDAPListener(");
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
index 207a014..4622d44 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
@@ -22,47 +22,64 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package com.forgerock.opendj.ldap;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
-import org.glassfish.grizzly.utils.LinkedTransferQueue;
-
-import com.forgerock.opendj.util.StaticUtils;
+import com.forgerock.opendj.util.ReferenceCountedObject;
/**
* Checks connection for pending requests that have timed out.
*/
final class TimeoutChecker {
- static final TimeoutChecker INSTANCE = new TimeoutChecker();
+ static final ReferenceCountedObject<TimeoutChecker> TIMEOUT_CHECKER =
+ new ReferenceCountedObject<TimeoutChecker>() {
+ @Override
+ protected void destroyInstance(final TimeoutChecker instance) {
+ instance.shutdown();
+ }
- private final LinkedTransferQueue<LDAPConnection> connections;
- private transient final ReentrantLock lock;
- private transient final Condition available;
+ @Override
+ protected TimeoutChecker newInstance() {
+ return new TimeoutChecker();
+ }
+ };
+
+ private final Condition available;
+ private final List<LDAPConnection> connections;
+ private final ReentrantLock lock;
+ private boolean shutdownRequested = false;
private TimeoutChecker() {
- this.connections = new LinkedTransferQueue<LDAPConnection>();
+ this.connections = new LinkedList<LDAPConnection>();
this.lock = new ReentrantLock();
this.available = lock.newCondition();
- final Thread checkerThread = new Thread("Timeout Checker") {
+ final Thread checkerThread = new Thread("OpenDJ LDAP SDK Connection Timeout Checker") {
@Override
public void run() {
- StaticUtils.DEBUG_LOG.fine("Timeout Checker Starting");
- final ReentrantLock lock = TimeoutChecker.this.lock;
+ DEBUG_LOG.fine("Timeout Checker Starting");
lock.lock();
try {
- while (true) {
+ while (!shutdownRequested) {
final long currentTime = System.currentTimeMillis();
long delay = 0;
for (final LDAPConnection connection : connections) {
- StaticUtils.DEBUG_LOG.finer("Checking connection " + connection
- + " delay = " + delay);
+ if (DEBUG_LOG.isLoggable(Level.FINER)) {
+ DEBUG_LOG.finer("Checking connection " + connection + " delay = "
+ + delay);
+ }
final long newDelay = connection.cancelExpiredRequests(currentTime);
if (newDelay > 0) {
if (delay > 0) {
@@ -75,15 +92,17 @@
try {
if (delay <= 0) {
- StaticUtils.DEBUG_LOG.finer("There are no connections with "
+ DEBUG_LOG.finer("There are no connections with "
+ "timeout specified. Sleeping");
available.await();
} else {
- StaticUtils.DEBUG_LOG.finer("Sleeping for " + delay + "ms");
+ if (DEBUG_LOG.isLoggable(Level.FINER)) {
+ DEBUG_LOG.log(Level.FINER, "Sleeping for " + delay + " ms");
+ }
available.await(delay, TimeUnit.MILLISECONDS);
}
} catch (final InterruptedException e) {
- // Just go around again.
+ shutdownRequested = true;
}
}
} finally {
@@ -97,7 +116,6 @@
}
void addConnection(final LDAPConnection connection) {
- final ReentrantLock lock = this.lock;
lock.lock();
try {
connections.add(connection);
@@ -108,7 +126,6 @@
}
void removeConnection(final LDAPConnection connection) {
- final ReentrantLock lock = this.lock;
lock.lock();
try {
connections.remove(connection);
@@ -116,4 +133,14 @@
lock.unlock();
}
}
+
+ private void shutdown() {
+ lock.lock();
+ try {
+ shutdownRequested = true;
+ available.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/ReferenceCountedObject.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/ReferenceCountedObject.java
new file mode 100644
index 0000000..0bdd3e7
--- /dev/null
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/ReferenceCountedObject.java
@@ -0,0 +1,162 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2013 ForgeRock AS.
+ */
+
+package com.forgerock.opendj.util;
+
+/**
+ * An object which is lazily created when first referenced, and destroyed when
+ * the last reference is released.
+ *
+ * @param <T>
+ * The type of referenced object.
+ */
+public abstract class ReferenceCountedObject<T> {
+
+ /**
+ * A reference to the reference counted object which will automatically be
+ * released during garbage collection.
+ */
+ public final class Reference {
+ private T value;
+
+ private Reference(final T value) {
+ this.value = value;
+ }
+
+ /**
+ * Returns the referenced object.
+ *
+ * @return The referenced object.
+ * @throws NullPointerException
+ * If the referenced object has already been released.
+ */
+ public T get() {
+ if (value == null) {
+ throw new NullPointerException(); // Fail-fast.
+ }
+ return value;
+ }
+
+ /**
+ * Decrements the reference count for the reference counted object if
+ * this reference refers to the reference counted instance. If the
+ * reference count drops to zero then the referenced object will be
+ * destroyed.
+ */
+ public void release() {
+ releaseIfSame(value);
+
+ /*
+ * Force NPE for subsequent get() attempts and prevent multiple
+ * releases.
+ */
+ value = null;
+ }
+
+ /**
+ * Provide a finalizer because reference counting is intended for
+ * expensive rarely created resources which should not be accidentally
+ * left around.
+ */
+ @Override
+ protected void finalize() {
+ release();
+ }
+ }
+
+ private T instance = null;
+ private final Object lock = new Object();
+ private int refCount = 0;
+
+ /**
+ * Creates a new referenced object whose reference count is initially zero.
+ */
+ protected ReferenceCountedObject() {
+ // Nothing to do.
+ }
+
+ /**
+ * Returns a reference to the reference counted object.
+ *
+ * @return A reference to the reference counted object.
+ */
+ public final Reference acquire() {
+ synchronized (lock) {
+ if (refCount++ == 0) {
+ assert instance == null;
+ instance = newInstance();
+ }
+ return new Reference(instance);
+ }
+ }
+
+ /**
+ * Returns a reference to the provided object or, if it is {@code null}, a
+ * reference to the reference counted object.
+ *
+ * @param value
+ * The object to be referenced, or {@code null} if the reference
+ * counted object should be used.
+ * @return A reference to the provided object or, if it is {@code null}, a
+ * reference to the reference counted object.
+ */
+ public final Reference acquireIfNull(final T value) {
+ return value != null ? new Reference(value) : acquire();
+ }
+
+ /**
+ * Invoked when a reference is released and the reference count will become
+ * zero. Implementations should release any resources associated with the
+ * resource and should not return until the resources have been released.
+ *
+ * @param instance
+ * The instance to be destroyed.
+ */
+ protected abstract void destroyInstance(T instance);
+
+ /**
+ * Invoked when a reference is acquired and the current reference count is
+ * zero. Implementations should create a new instance as fast as possible.
+ *
+ * @return The new instance.
+ */
+ protected abstract T newInstance();
+
+ private final void releaseIfSame(final T instance) {
+ T instanceToRelease = null;
+ synchronized (lock) {
+ if (this.instance == instance) {
+ if (--refCount == 0) {
+ instanceToRelease = instance;
+ this.instance = null;
+ }
+ }
+ }
+ if (instanceToRelease != null) {
+ destroyInstance(instanceToRelease);
+ }
+ }
+
+}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/StaticUtils.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/StaticUtils.java
index 847ad6d..f89e2b0 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/StaticUtils.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/StaticUtils.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package com.forgerock.opendj.util;
@@ -48,6 +48,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -86,9 +87,30 @@
// UTC TimeZone is assumed to never change over JVM lifetime
private static final TimeZone TIME_ZONE_UTC_OBJ = TimeZone.getTimeZone(TIME_ZONE_UTC);
- private static ScheduledExecutorService defaultScheduler = null;
+ /**
+ * The default scheduler which should be used when the application does not
+ * provide one.
+ */
+ public static final ReferenceCountedObject<ScheduledExecutorService> DEFAULT_SCHEDULER =
+ new ReferenceCountedObject<ScheduledExecutorService>() {
- private static final Object DEFAULT_SCHEDULER_LOCK = new Object();
+ @Override
+ protected ScheduledExecutorService newInstance() {
+ final ThreadFactory factory =
+ newThreadFactory(null, "OpenDJ LDAP SDK Default Scheduler", true);
+ return Executors.newSingleThreadScheduledExecutor(factory);
+ }
+
+ @Override
+ protected void destroyInstance(ScheduledExecutorService instance) {
+ instance.shutdown();
+ try {
+ instance.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
/**
* Retrieves a string representation of the provided byte in hexadecimal.
@@ -1395,22 +1417,6 @@
}
/**
- * Returns the default scheduler which should be used by the SDK.
- *
- * @return The default scheduler.
- */
- public static ScheduledExecutorService getDefaultScheduler() {
- synchronized (DEFAULT_SCHEDULER_LOCK) {
- if (defaultScheduler == null) {
- final ThreadFactory factory =
- newThreadFactory(null, "OpenDJ SDK Default Scheduler", true);
- defaultScheduler = Executors.newSingleThreadScheduledExecutor(factory);
- }
- }
- return defaultScheduler;
- }
-
- /**
* Retrieves the best human-readable message for the provided exception. For
* exceptions defined in the OpenDJ project, it will attempt to use the
* message (combining it with the message ID if available). For some
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractConnectionWrapper.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractConnectionWrapper.java
index 5de8c93..67f7f74 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractConnectionWrapper.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractConnectionWrapper.java
@@ -55,12 +55,15 @@
* An abstract base class from which connection wrappers may be easily
* implemented. The default implementation of each method is to delegate to the
* wrapped connection.
+ *
+ * @param <C>
+ * The type of wrapped connection.
*/
-public abstract class AbstractConnectionWrapper implements Connection {
+public abstract class AbstractConnectionWrapper<C extends Connection> implements Connection {
/**
* The wrapped connection.
*/
- protected final Connection connection;
+ protected final C connection;
/**
* Creates a new connection wrapper.
@@ -68,7 +71,7 @@
* @param connection
* The connection to be wrapped.
*/
- protected AbstractConnectionWrapper(final Connection connection) {
+ protected AbstractConnectionWrapper(final C connection) {
Validator.ensureNotNull(connection);
this.connection = connection;
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
index aead5f3..a868df4 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -22,11 +22,13 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS.
+ * Portions copyright 2011-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.util.ArrayList;
@@ -39,7 +41,7 @@
import java.util.logging.Level;
import com.forgerock.opendj.util.AsynchronousFutureResult;
-import com.forgerock.opendj.util.StaticUtils;
+import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
@@ -55,11 +57,8 @@
ResultHandler<Connection> {
private final ConnectionFactory factory;
-
private final AtomicBoolean isOperational = new AtomicBoolean(true);
-
private volatile FutureResult<?> pendingConnectFuture = null;
-
private final int index;
private MonitoredConnectionFactory(final ConnectionFactory factory, final int index) {
@@ -67,9 +66,12 @@
this.index = index;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ // Should we cancel the future?
+ factory.close();
+ }
+
@Override
public Connection getConnection() throws ErrorResultException {
final Connection connection;
@@ -87,14 +89,12 @@
return connection;
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> resultHandler) {
final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future =
- new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(resultHandler);
+ new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
+ resultHandler);
final ResultHandler<Connection> failoverHandler = new ResultHandler<Connection>() {
@Override
@@ -141,9 +141,6 @@
connection.close();
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
return factory.toString();
@@ -156,9 +153,9 @@
private synchronized void checkIfAvailable() {
if (!isOperational.get()
&& (pendingConnectFuture == null || pendingConnectFuture.isDone())) {
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
- StaticUtils.DEBUG_LOG.fine(String
- .format("Attempting reconnect to offline factory " + this));
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format("Attempting reconnect to offline factory '%s'",
+ this));
}
pendingConnectFuture = factory.getConnectionAsync(this);
}
@@ -167,21 +164,22 @@
private void notifyOffline(final ErrorResultException error) {
if (isOperational.getAndSet(false)) {
// Transition from online to offline.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) {
- StaticUtils.DEBUG_LOG.warning(String.format("Connection factory " + factory
- + " is no longer operational: " + error.getMessage()));
+ if (DEBUG_LOG.isLoggable(Level.WARNING)) {
+ DEBUG_LOG.warning(String.format(
+ "Connection factory '%s' is no longer operational: %s", factory, error
+ .getMessage()));
}
synchronized (stateLock) {
offlineFactoriesCount++;
if (offlineFactoriesCount == 1) {
// Enable monitoring.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
- StaticUtils.DEBUG_LOG.fine(String.format("Starting monitoring thread"));
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format("Starting monitoring thread"));
}
monitoringFuture =
- scheduler.scheduleWithFixedDelay(new MonitorRunnable(), 0,
+ scheduler.get().scheduleWithFixedDelay(new MonitorRunnable(), 0,
monitoringInterval, monitoringIntervalTimeUnit);
}
}
@@ -191,16 +189,16 @@
private void notifyOnline() {
if (!isOperational.getAndSet(true)) {
// Transition from offline to online.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO)) {
- StaticUtils.DEBUG_LOG.info(String.format("Connection factory " + factory
- + " is now operational"));
+ if (DEBUG_LOG.isLoggable(Level.INFO)) {
+ DEBUG_LOG.info(String.format("Connection factory'%s' is now operational",
+ factory));
}
synchronized (stateLock) {
offlineFactoriesCount--;
if (offlineFactoriesCount == 0) {
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
- StaticUtils.DEBUG_LOG.fine(String.format("Stopping monitoring thread"));
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format("Stopping monitoring thread"));
}
monitoringFuture.cancel(false);
@@ -225,91 +223,65 @@
}
private final List<MonitoredConnectionFactory> monitoredFactories;
-
- private final ScheduledExecutorService scheduler;
-
+ private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
private final Object stateLock = new Object();
-
- // Guarded by stateLock.
- private int offlineFactoriesCount = 0;
-
- private final long monitoringInterval;
-
- private final TimeUnit monitoringIntervalTimeUnit;
-
- // Guarded by stateLock.
- private ScheduledFuture<?> monitoringFuture;
-
/**
- * Creates a new abstract load balancing algorithm which will monitor
- * offline connection factories every second using the default scheduler.
- *
- * @param factories
- * The connection factories.
+ * Guarded by stateLock.
*/
+ private int offlineFactoriesCount = 0;
+ private final long monitoringInterval;
+ private final TimeUnit monitoringIntervalTimeUnit;
+ /**
+ * Guarded by stateLock.
+ */
+ private ScheduledFuture<?> monitoringFuture;
+ private AtomicBoolean isClosed = new AtomicBoolean();
+
AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) {
- this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
+ this(factories, 1, TimeUnit.SECONDS, null);
}
- /**
- * Creates a new abstract load balancing algorithm which will monitor
- * offline connection factories using the specified frequency using the
- * default scheduler.
- *
- * @param factories
- * The connection factories.
- * @param interval
- * The interval between attempts to poll offline factories.
- * @param unit
- * The time unit for the interval between attempts to poll
- * offline factories.
- */
AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
final long interval, final TimeUnit unit) {
- this(factories, interval, unit, StaticUtils.getDefaultScheduler());
+ this(factories, interval, unit, null);
}
- /**
- * Creates a new abstract load balancing algorithm which will monitor
- * offline connection factories using the specified frequency and scheduler.
- *
- * @param factories
- * The connection factories.
- * @param interval
- * The interval between attempts to poll offline factories.
- * @param unit
- * The time unit for the interval between attempts to poll
- * offline factories.
- * @param scheduler
- * The scheduler which should for periodically monitoring dead
- * connection factories to see if they are usable again.
- */
AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
final long interval, final TimeUnit unit, final ScheduledExecutorService scheduler) {
- Validator.ensureNotNull(factories, scheduler, unit);
+ Validator.ensureNotNull(factories, unit);
this.monitoredFactories = new ArrayList<MonitoredConnectionFactory>(factories.size());
int i = 0;
for (final ConnectionFactory f : factories) {
this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++));
}
- this.scheduler = scheduler;
+ this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
this.monitoringInterval = interval;
this.monitoringIntervalTimeUnit = unit;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ if (isClosed.compareAndSet(false, true)) {
+ synchronized (stateLock) {
+ if (monitoringFuture != null) {
+ monitoringFuture.cancel(false);
+ monitoringFuture = null;
+ }
+ }
+ for (ConnectionFactory factory : monitoredFactories) {
+ factory.close();
+ }
+ scheduler.release();
+ }
+ }
+
@Override
public final ConnectionFactory getConnectionFactory() throws ErrorResultException {
final int index = getInitialConnectionFactoryIndex();
return getMonitoredConnectionFactory(index);
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
@@ -357,9 +329,11 @@
index = (index + 1) % maxIndex;
} while (index != initialIndex);
- // All factories are offline so give up. We could have a
- // configurable policy here such as waiting indefinitely, or for a
- // configurable timeout period.
+ /*
+ * All factories are offline so give up. We could have a configurable
+ * policy here such as waiting indefinitely, or for a configurable
+ * timeout period.
+ */
throw newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR,
"No operational connection factories available");
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AuthenticatedConnectionFactory.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AuthenticatedConnectionFactory.java
index f4962ab..d0eb99d 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AuthenticatedConnectionFactory.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AuthenticatedConnectionFactory.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS.
+ * Portions copyright 2011-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -51,43 +51,34 @@
* An authenticated connection supports all operations except Bind
* operations.
*/
- public static final class AuthenticatedConnection extends AbstractConnectionWrapper {
+ public static final class AuthenticatedConnection extends AbstractConnectionWrapper<Connection> {
private AuthenticatedConnection(final Connection connection) {
super(connection);
}
- /**
+ /*
* Bind operations are not supported by pre-authenticated connections.
- * This method will always throw {@code UnsupportedOperationException}.
+ * These methods will always throw {@code UnsupportedOperationException}.
*/
- /**
- * {@inheritDoc}
- */
public FutureResult<BindResult> bindAsync(final BindRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final ResultHandler<? super BindResult> resultHandler) {
throw new UnsupportedOperationException();
}
- /**
- * {@inheritDoc}
- */
+
public BindResult bind(BindRequest request) throws ErrorResultException {
throw new UnsupportedOperationException();
}
- /**
- * {@inheritDoc}
- */
+
public BindResult bind(String name, char[] password) throws ErrorResultException {
throw new UnsupportedOperationException();
}
- /**
- * {@inheritDoc}
- */
+
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AuthenticatedConnection(");
@@ -100,11 +91,8 @@
private static final class FutureResultImpl {
private final FutureResultTransformer<BindResult, Connection> futureBindResult;
-
private final RecursiveFutureResult<Connection, BindResult> futureConnectionResult;
-
private final BindRequest bindRequest;
-
private Connection connection;
private FutureResultImpl(final BindRequest request,
@@ -148,7 +136,6 @@
}
private final BindRequest request;
-
private final ConnectionFactory parentFactory;
/**
@@ -169,9 +156,12 @@
this.request = request;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ // Delegate.
+ parentFactory.close();
+ }
+
public Connection getConnection() throws ErrorResultException {
final Connection connection = parentFactory.getConnection();
boolean bindSucceeded = false;
@@ -183,14 +173,15 @@
connection.close();
}
}
- // If the bind didn't succeed then an exception will have been thrown
- // and this line will not be reached.
+
+ /*
+ * If the bind didn't succeed then an exception will have been thrown
+ * and this line will not be reached.
+ */
return new AuthenticatedConnection(connection);
}
- /**
- * {@inheritDoc}
- */
+
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
@@ -200,9 +191,7 @@
return future.futureBindResult;
}
- /**
- * {@inheritDoc}
- */
+
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("AuthenticatedConnectionFactory(");
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connection.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connection.java
index 0afa53e..471bb50 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connection.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connection.java
@@ -439,7 +439,10 @@
*
* Calling {@code close} on a connection that is already closed has no
* effect.
+ *
+ * @see Connections#uncloseable(Connection)
*/
+ @Override
void close();
/**
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionFactory.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionFactory.java
index 3be6d5c..af6bf07 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionFactory.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionFactory.java
@@ -22,11 +22,13 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS.
+ * Portions copyright 2011-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
+import java.io.Closeable;
+
/**
* A connection factory provides an interface for obtaining a connection to a
* Directory Server. Connection factories can be used to wrap other connection
@@ -49,7 +51,28 @@
* should aim to close connections as soon as possible in order to avoid
* resource contention.
*/
-public interface ConnectionFactory {
+public interface ConnectionFactory extends Closeable {
+
+ /**
+ * Releases any resources associated with this connection factory. Depending
+ * on the implementation a factory may:
+ * <ul>
+ * <li>do nothing
+ * <li>close underlying connection factories (e.g. load-balancers)
+ * <li>close pooled connections (e.g. connection pools)
+ * <li>shutdown IO event service and related thread pools (e.g. Grizzly).
+ * </ul>
+ * Calling {@code close} on a connection factory which is already closed has
+ * no effect.
+ * <p>
+ * Applications should avoid closing connection factories while there are
+ * remaining active connections in use or connection attempts in progress.
+ *
+ * @see Connections#uncloseable(ConnectionFactory)
+ */
+ @Override
+ public void close();
+
/**
* Asynchronously obtains a connection to the Directory Server associated
* with this connection factory. The returned {@code FutureResult} can be
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
index 98e574e..d9fc7f1 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
@@ -21,13 +21,11 @@
* CDDL HEADER END
*
*
- * Copyright 2011-2012 ForgeRock AS
+ * Copyright 2011-2013 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
-import java.io.Closeable;
-
/**
* A connection factory which maintains and re-uses a pool of connections.
* Connections obtained from a connection pool are returned to the connection
@@ -41,7 +39,7 @@
* Since pooled connections are re-used, applications must use operations such
* as binds and StartTLS with extreme caution.
*/
-public interface ConnectionPool extends ConnectionFactory, Closeable {
+public interface ConnectionPool extends ConnectionFactory {
/**
* Releases any resources associated with this connection pool. Pooled
* connections will be permanently closed and this connection pool will no
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
index deea05a..42f31b2 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -386,6 +386,11 @@
return new ConnectionFactory() {
@Override
+ public void close() {
+ factory.close();
+ }
+
+ @Override
public Connection getConnection() throws ErrorResultException {
return factory.getConnection();
}
@@ -396,9 +401,6 @@
return factory.getConnectionAsync(handler);
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
return name;
@@ -483,7 +485,7 @@
* @return An uncloseable view of the provided connection.
*/
public static Connection uncloseable(Connection connection) {
- return new AbstractConnectionWrapper(connection) {
+ return new AbstractConnectionWrapper<Connection>(connection) {
@Override
public void close() {
// Do nothing.
@@ -496,6 +498,36 @@
};
}
+ /**
+ * Returns an uncloseable view of the provided connection factory. Attempts
+ * to call {@link ConnectionFactory#close()} will be ignored.
+ *
+ * @param factory
+ * The connection factory whose {@code close} method is to be
+ * disabled.
+ * @return An uncloseable view of the provided connection factory.
+ */
+ public static ConnectionFactory uncloseable(final ConnectionFactory factory) {
+ return new ConnectionFactory() {
+
+ @Override
+ public FutureResult<Connection> getConnectionAsync(
+ ResultHandler<? super Connection> handler) {
+ return factory.getConnectionAsync(handler);
+ }
+
+ @Override
+ public Connection getConnection() throws ErrorResultException {
+ return factory.getConnection();
+ }
+
+ @Override
+ public void close() {
+ // Do nothing.
+ }
+ };
+ }
+
// Prevent instantiation.
private Connections() {
// Do nothing.
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
index 42ad688..5225af8 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -75,18 +75,17 @@
* the pool completes.
*/
private final class ConnectionResultHandler implements ResultHandler<Connection> {
- /**
- * {@inheritDoc}
- */
+
@Override
public void handleErrorResult(final ErrorResultException error) {
// Connection attempt failed, so decrease the pool size.
currentPoolSize.release();
if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format("Connection attempt failed: " + error.getMessage()
- + " currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ DEBUG_LOG.fine(String.format(
+ "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error
+ .getMessage(), poolSize - currentPoolSize.availablePermits(),
+ poolSize));
}
QueueElement holder;
@@ -103,17 +102,13 @@
holder.getWaitingFuture().handleErrorResult(error);
}
- /**
- * {@inheritDoc}
- */
@Override
public void handleResult(final Connection connection) {
if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format("Connection attempt succeeded: "
- + " currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ DEBUG_LOG.fine(String.format(
+ "Connection attempt succeeded: currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
}
-
publishConnection(connection);
}
}
@@ -173,10 +168,12 @@
notifyErrorOccurred = error != null;
if (!notifyClose) {
if (listeners == null) {
- // Create and register first listener. If an error has
- // already occurred on the underlying connection, then
- // the listener may be immediately invoked so ensure
- // that it is already in the list.
+ /*
+ * Create and register first listener. If an error has
+ * already occurred on the underlying connection, then
+ * the listener may be immediately invoked so ensure
+ * that it is already in the list.
+ */
listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
listeners.add(listener);
connection.addConnectionEventListener(this);
@@ -235,8 +232,10 @@
tmpListeners = listeners;
}
- // Remove underlying listener if needed and do this before
- // subsequent connection events may occur.
+ /*
+ * Remove underlying listener if needed and do this before
+ * subsequent connection events may occur.
+ */
if (tmpListeners != null) {
connection.removeConnectionEventListener(this);
}
@@ -245,18 +244,20 @@
if (connection.isValid()) {
publishConnection(connection);
} else {
- // The connection may have been disconnected by the remote
- // server, but the server may still be available. In order to
- // avoid leaving pending futures hanging indefinitely, we should
- // try to reconnect immediately. No need to release/acquire
- // currentPoolSize.
+ /*
+ * The connection may have been disconnected by the remote
+ * server, but the server may still be available. In order to
+ * avoid leaving pending futures hanging indefinitely, we should
+ * try to reconnect immediately. No need to release/acquire
+ * currentPoolSize.
+ */
connection.close();
factory.getConnectionAsync(connectionResultHandler);
if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format("Connection no longer valid. "
- + "currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ DEBUG_LOG.fine(String.format(
+ "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize
+ - currentPoolSize.availablePermits(), poolSize));
}
}
@@ -542,7 +543,9 @@
}
QueueElement(final ResultHandler<? super Connection> handler) {
- this.value = new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(handler);
+ this.value =
+ new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
+ handler);
}
@Override
@@ -575,24 +578,12 @@
private final int poolSize;
private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
- /**
- * Creates a new connection pool which will maintain {@code poolSize}
- * connections created using the provided connection factory.
- *
- * @param factory
- * The connection factory to use for creating new connections.
- * @param poolSize
- * The maximum size of the connection pool.
- */
FixedConnectionPool(final ConnectionFactory factory, final int poolSize) {
this.factory = factory;
this.poolSize = poolSize;
this.currentPoolSize = new Semaphore(poolSize);
}
- /**
- * {@inheritDoc}
- */
@Override
public void close() {
final LinkedList<Connection> idleConnections;
@@ -602,8 +593,10 @@
}
isClosed = true;
- // Remove any connections which are waiting in the queue as these
- // can be closed immediately.
+ /*
+ * Remove any connections which are waiting in the queue as these
+ * can be closed immediately.
+ */
idleConnections = new LinkedList<Connection>();
while (hasWaitingConnections()) {
final QueueElement holder = queue.removeFirst();
@@ -621,11 +614,11 @@
for (final Connection connection : idleConnections) {
closeConnection(connection);
}
+
+ // Close the underlying factory.
+ factory.close();
}
- /**
- * {@inheritDoc}
- */
@Override
public Connection getConnection() throws ErrorResultException {
try {
@@ -635,9 +628,6 @@
}
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
@@ -672,9 +662,9 @@
currentPoolSize.release();
if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format("Connection no longer valid. "
- + "currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ DEBUG_LOG.fine(String.format(
+ "Connection no longer valid: currentPoolSize=%d, poolSize=%d",
+ poolSize - currentPoolSize.availablePermits(), poolSize));
}
}
} else {
@@ -688,9 +678,6 @@
}
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
@@ -719,7 +706,7 @@
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format("Closing connection because connection pool is closing: "
- + " currentPoolSize=%d, poolSize=%d", poolSize
+ + "currentPoolSize=%d, poolSize=%d", poolSize
- currentPoolSize.availablePermits(), poolSize));
}
}
@@ -763,14 +750,14 @@
holder.getWaitingFuture().handleErrorResult(e);
if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format("Connection attempt failed: " + e.getMessage()
- + " currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ DEBUG_LOG.fine(String.format(
+ "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e
+ .getMessage(), poolSize - currentPoolSize.availablePermits(),
+ poolSize));
}
}
} else {
- final PooledConnection pooledConnection = new PooledConnection(connection);
- holder.getWaitingFuture().handleResult(pooledConnection);
+ holder.getWaitingFuture().handleResult(new PooledConnection(connection));
}
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 9f73c51..669195f 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS.
+ * Portions copyright 2011-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -40,6 +40,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Level;
@@ -64,7 +65,7 @@
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.FutureResultTransformer;
-import com.forgerock.opendj.util.StaticUtils;
+import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
@@ -75,7 +76,7 @@
/**
* A connection that sends heart beats and supports all operations.
*/
- private final class ConnectionImpl extends AbstractConnectionWrapper implements
+ private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements
ConnectionEventListener, SearchResultHandler {
/**
@@ -85,9 +86,8 @@
* @param <R>
* The type of result returned by the request.
*/
- private abstract class DelayedFuture<R extends Result>
- extends AsynchronousFutureResult<R, ResultHandler<? super R>>
- implements Runnable {
+ private abstract class DelayedFuture<R extends Result> extends
+ AsynchronousFutureResult<R, ResultHandler<? super R>> implements Runnable {
private volatile FutureResult<R> innerFuture = null;
protected DelayedFuture(final ResultHandler<? super R> handler) {
@@ -123,14 +123,19 @@
}
- // List of pending Bind or StartTLS requests which must be invoked
- // when the current heart beat completes.
+ /*
+ * List of pending Bind or StartTLS requests which must be invoked when
+ * the current heart beat completes.
+ */
private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>();
- // Coordinates heart-beats with Bind and StartTLS requests.
+ /* Coordinates heart-beats with Bind and StartTLS requests. */
private final Sync sync = new Sync();
- // Timestamp of last response received (any response, not just heart beats).
+ /*
+ * Timestamp of last response received (any response, not just heart
+ * beats).
+ */
private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
private ConnectionImpl(final Connection connection) {
@@ -206,8 +211,10 @@
return connection.bindAsync(request, intermediateResponseHandler, timestamper(
resultHandler, true));
} else {
- // A heart beat must be in progress so create a runnable task
- // which will be executed when the heart beat completes.
+ /*
+ * A heart beat must be in progress so create a runnable task
+ * which will be executed when the heart beat completes.
+ */
final DelayedFuture<BindResult> future =
new DelayedFuture<BindResult>(resultHandler) {
@Override
@@ -216,7 +223,10 @@
timestamper(this, true));
}
};
- // Enqueue and flush if the heart beat has completed in the mean time.
+ /*
+ * Enqueue and flush if the heart beat has completed in the mean
+ * time.
+ */
pendingRequests.offer(future);
flushPendingRequests();
return future;
@@ -342,8 +352,11 @@
return connection.extendedRequestAsync(request, intermediateResponseHandler,
timestamper(resultHandler, true));
} else {
- // A heart beat must be in progress so create a runnable task
- // which will be executed when the heart beat completes.
+ /*
+ * A heart beat must be in progress so create a runnable
+ * task which will be executed when the heart beat
+ * completes.
+ */
final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
@Override
public FutureResult<R> dispatch() {
@@ -351,7 +364,11 @@
intermediateResponseHandler, timestamper(this, true));
}
};
- // Enqueue and flush if the heart beat has completed in the mean time.
+
+ /*
+ * Enqueue and flush if the heart beat has completed in the
+ * mean time.
+ */
pendingRequests.offer(future);
flushPendingRequests();
return future;
@@ -382,7 +399,7 @@
@Override
public void handleErrorResult(final ErrorResultException error) {
if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format("Heartbeat failed: " + error.getMessage()));
+ DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage()));
}
updateTimestamp();
releaseHeartBeatLock();
@@ -582,8 +599,10 @@
}
private void acquireBindOrStartTLSLock() throws ErrorResultException {
- // Wait for pending heartbeats and prevent new heartbeats from
- // being sent while the bind is in progress.
+ /*
+ * Wait for pending heartbeats and prevent new heartbeats from being
+ * sent while the bind is in progress.
+ */
try {
if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
// Give up - it looks like the connection is dead.
@@ -597,8 +616,11 @@
private void flushPendingRequests() {
if (!pendingRequests.isEmpty()) {
- // The pending requests will acquire the shared lock, but we take
- // it here anyway to ensure that pending requests do not get blocked.
+ /*
+ * The pending requests will acquire the shared lock, but we
+ * take it here anyway to ensure that pending requests do not
+ * get blocked.
+ */
if (sync.tryLockShared()) {
try {
Runnable pendingRequest;
@@ -617,7 +639,10 @@
connection.removeConnectionEventListener(this);
activeConnections.remove(this);
if (activeConnections.isEmpty()) {
- // This is the last active connection, so stop the heartbeat.
+ /*
+ * This is the last active connection, so stop the
+ * heartbeat.
+ */
heartBeatFuture.cancel(false);
}
}
@@ -633,22 +658,33 @@
}
private void sendHeartBeat() {
- // Only send the heartbeat if the connection has been idle for some time.
+ /*
+ * Only send the heartbeat if the connection has been idle for some
+ * time.
+ */
if (currentTimeMillis() < (timestamp + minDelayMS)) {
return;
}
- // Don't send a heart beat if there is already a heart beat,
- // bind, or startTLS in progress. Note that the bind/startTLS
- // response will update the timestamp as if it were a heart beat.
+ /*
+ * Don't send a heart beat if there is already a heart beat, bind,
+ * or startTLS in progress. Note that the bind/startTLS response
+ * will update the timestamp as if it were a heart beat.
+ */
if (sync.tryLockExclusively()) {
try {
connection.searchAsync(heartBeatRequest, null, this);
} catch (final Exception e) {
- // This may happen when we attempt to send the heart beat just
- // after the connection is closed but before we are notified.
+ /*
+ * This may happen when we attempt to send the heart beat
+ * just after the connection is closed but before we are
+ * notified.
+ */
- // Release the lock because we're never going to get a response.
+ /*
+ * Release the lock because we're never going to get a
+ * response.
+ */
releaseHeartBeatLock();
}
}
@@ -755,7 +791,7 @@
* </ul>
*/
private static final class Sync extends AbstractQueuedSynchronizer {
- // Lock states. Positive values indicate that the shared lock is taken.
+ /* Lock states. Positive values indicate that the shared lock is taken. */
private static final int UNLOCKED = 0; // initial state
private static final int LOCKED_EXCLUSIVELY = -1;
@@ -809,8 +845,10 @@
}
final int newState = state - 1;
if (compareAndSetState(state, newState)) {
- // We could always return true here, but since there cannot
- // be waiting readers we can specialize for waiting writers.
+ /*
+ * We could always return true here, but since there cannot
+ * be waiting readers we can specialize for waiting writers.
+ */
return newState == UNLOCKED;
}
}
@@ -851,83 +889,29 @@
private final SearchRequest heartBeatRequest;
private final long interval;
private final long minDelayMS;
- private final ScheduledExecutorService scheduler;
+ private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
private final long timeoutMS;
private final TimeUnit unit;
+ private AtomicBoolean isClosed = new AtomicBoolean();
- /**
- * Creates a new heart-beat connection factory which will create connections
- * using the provided connection factory and periodically ping any created
- * connections in order to detect that they are still alive every 10 seconds
- * using the default scheduler.
- *
- * @param factory
- * The connection factory to use for creating connections.
- */
HeartBeatConnectionFactory(final ConnectionFactory factory) {
- this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler());
+ this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null);
}
- /**
- * Creates a new heart-beat connection factory which will create connections
- * using the provided connection factory and periodically ping any created
- * connections in order to detect that they are still alive using the
- * specified frequency and the default scheduler.
- *
- * @param factory
- * The connection factory to use for creating connections.
- * @param interval
- * The interval between keepalive pings.
- * @param unit
- * The time unit for the interval between keepalive pings.
- */
HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
final TimeUnit unit) {
- this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler());
+ this(factory, interval, unit, DEFAULT_SEARCH, null);
}
- /**
- * Creates a new heart-beat connection factory which will create connections
- * using the provided connection factory and periodically ping any created
- * connections using the specified search request in order to detect that
- * they are still alive.
- *
- * @param factory
- * The connection factory to use for creating connections.
- * @param interval
- * The interval between keepalive pings.
- * @param unit
- * The time unit for the interval between keepalive pings.
- * @param heartBeat
- * The search request to use for keepalive pings.
- */
HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
final TimeUnit unit, final SearchRequest heartBeat) {
- this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler());
+ this(factory, interval, unit, heartBeat, null);
}
- /**
- * Creates a new heart-beat connection factory which will create connections
- * using the provided connection factory and periodically ping any created
- * connections using the specified search request in order to detect that
- * they are still alive.
- *
- * @param factory
- * The connection factory to use for creating connections.
- * @param interval
- * The interval between keepalive pings.
- * @param unit
- * The time unit for the interval between keepalive pings.
- * @param heartBeat
- * The search request to use for keepalive pings.
- * @param scheduler
- * The scheduler which should for periodically sending keepalive
- * pings.
- */
HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
final TimeUnit unit, final SearchRequest heartBeat,
final ScheduledExecutorService scheduler) {
- Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
+ Validator.ensureNotNull(factory, heartBeat, unit);
Validator.ensureTrue(interval >= 0, "negative timeout");
this.heartBeatRequest = heartBeat;
@@ -935,22 +919,34 @@
this.unit = unit;
this.activeConnections = new LinkedList<ConnectionImpl>();
this.factory = factory;
- this.scheduler = scheduler;
+ this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
this.timeoutMS = unit.toMillis(interval) * 2;
this.minDelayMS = unit.toMillis(interval) / 2;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ if (isClosed.compareAndSet(false, true)) {
+ synchronized (activeConnections) {
+ if (!activeConnections.isEmpty()) {
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format(
+ "HeartbeatConnectionFactory '%s' is closing while %d "
+ + "active connections remain", toString(),
+ activeConnections.size()));
+ }
+ }
+ }
+ scheduler.release();
+ factory.close();
+ }
+ }
+
@Override
public Connection getConnection() throws ErrorResultException {
return adaptConnection(factory.getConnection());
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
@@ -967,9 +963,6 @@
return future;
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
@@ -984,8 +977,8 @@
synchronized (activeConnections) {
connection.addConnectionEventListener(heartBeatConnection);
if (activeConnections.isEmpty()) {
- // This is the first active connection, so start the heart beat.
- heartBeatFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
+ /* This is the first active connection, so start the heart beat. */
+ heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
final ConnectionImpl[] tmp;
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/InternalConnectionFactory.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/InternalConnectionFactory.java
index ceaac3c..f5cb757 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/InternalConnectionFactory.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/InternalConnectionFactory.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS.
+ * Portions copyright 2011-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -50,9 +50,7 @@
* The type of client context.
*/
final class InternalConnectionFactory<C> implements ConnectionFactory {
-
private final ServerConnectionFactory<C, Integer> factory;
-
private final C clientContext;
InternalConnectionFactory(final ServerConnectionFactory<C, Integer> factory,
@@ -61,17 +59,16 @@
this.clientContext = clientContext;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ // Nothing to do.
+ }
+
public Connection getConnection() throws ErrorResultException {
final ServerConnection<Integer> serverConnection = factory.handleAccept(clientContext);
return new InternalConnection(serverConnection);
}
- /**
- * {@inheritDoc}
- */
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
final ServerConnection<Integer> serverConnection;
@@ -91,9 +88,6 @@
return new CompletedFutureResult<Connection>(connection);
}
- /**
- * {@inheritDoc}
- */
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("InternalConnectionFactory(");
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
index 0e7655d..043e489 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS.
+ * Portions copyright 2011-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -39,9 +39,10 @@
* Server.
*/
public final class LDAPConnectionFactory implements ConnectionFactory {
- // We implement the factory using the pimpl idiom in order to avoid making
- // too many implementation classes public.
-
+ /*
+ * We implement the factory using the pimpl idiom in order to avoid making
+ * too many implementation classes public.
+ */
private final LDAPConnectionFactoryImpl impl;
/**
@@ -125,18 +126,17 @@
}
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ impl.close();
+ }
+
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
return impl.getConnectionAsync(handler);
}
- /**
- * {@inheritDoc}
- */
@Override
public Connection getConnection() throws ErrorResultException {
return impl.getConnection();
@@ -183,9 +183,6 @@
return impl.getSocketAddress();
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
return impl.toString();
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
index 2b59286..9c0143d 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS.
+ * Portions copyright 2011-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -37,28 +37,22 @@
final class LoadBalancer implements ConnectionFactory {
private final LoadBalancingAlgorithm algorithm;
- /**
- * Creates a new load balancer using the provided algorithm.
- *
- * @param algorithm
- * The load balancing algorithm which will be used to obtain the
- * next connection factory.
- */
- public LoadBalancer(final LoadBalancingAlgorithm algorithm) {
+ LoadBalancer(final LoadBalancingAlgorithm algorithm) {
Validator.ensureNotNull(algorithm);
this.algorithm = algorithm;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ // Delegate to the algorithm.
+ algorithm.close();
+ }
+
+ @Override
public Connection getConnection() throws ErrorResultException {
return algorithm.getConnectionFactory().getConnection();
}
- /**
- * {@inheritDoc}
- */
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> resultHandler) {
@@ -76,9 +70,7 @@
return factory.getConnectionAsync(resultHandler);
}
- /**
- * {@inheritDoc}
- */
+ @Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("LoadBalancer(");
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java
index 9e2e9ec..ac30d2d 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java
@@ -22,17 +22,28 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
+import java.io.Closeable;
+
/**
* A load balancing algorithm distributes connection requests across one or more
* underlying connection factories in an implementation defined manner.
*
* @see Connections#newLoadBalancer(LoadBalancingAlgorithm) newLoadBalancer
*/
-public interface LoadBalancingAlgorithm {
+public interface LoadBalancingAlgorithm extends Closeable {
+
+ /**
+ * Releases any resources associated with this algorithm, including any
+ * associated connection factories.
+ */
+ @Override
+ public void close();
+
/**
* Returns a connection factory which should be used in order to satisfy the
* next connection request.
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransportTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransportTestCase.java
index 7058267..4f6f844 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransportTestCase.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransportTestCase.java
@@ -22,11 +22,12 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2012 ForgeRock AS.
+ * Portions copyright 2012-2013 ForgeRock AS.
*/
package com.forgerock.opendj.ldap;
+import static com.forgerock.opendj.ldap.DefaultTCPNIOTransport.DEFAULT_TRANSPORT;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.testng.Assert.assertTrue;
@@ -36,6 +37,8 @@
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.testng.annotations.Test;
+import com.forgerock.opendj.util.ReferenceCountedObject;
+
/**
* Tests DefaultTCPNIOTransport class.
*/
@@ -52,9 +55,10 @@
@Test(enabled = false)
public void testGetInstance() throws Exception {
// Create a transport.
- final TCPNIOTransport transport = DefaultTCPNIOTransport.getInstance();
+ final ReferenceCountedObject<TCPNIOTransport>.Reference transport =
+ DEFAULT_TRANSPORT.acquire();
SocketAddress socketAddress = findFreeSocketAddress();
- transport.bind(socketAddress);
+ transport.get().bind(socketAddress);
// Establish a socket connection to see if the transport factory works.
final Socket socket = new Socket();
@@ -66,6 +70,7 @@
// Don't stop the transport because it is shared with the ldap server.
} finally {
socket.close();
+ transport.release();
}
}
}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/ReferenceCountedObjectTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/ReferenceCountedObjectTestCase.java
new file mode 100644
index 0000000..ec51fd9
--- /dev/null
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/ReferenceCountedObjectTestCase.java
@@ -0,0 +1,151 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions copyright 2013 ForgeRock AS.
+ */
+
+package com.forgerock.opendj.util;
+
+import static org.fest.assertions.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.testng.annotations.Test;
+
+/**
+ * This Test Class tests {@link ReferenceCountedObject}.
+ */
+@SuppressWarnings("javadoc")
+public class ReferenceCountedObjectTestCase extends UtilTestCase {
+
+ private interface Impl {
+ void destroyInstance(Object instance);
+
+ Object newInstance();
+ }
+
+ private final Object object = "Test Object";
+
+ @Test
+ public void testAcquire() throws Exception {
+ final Impl impl = mock(Impl.class);
+ when(impl.newInstance()).thenReturn(object);
+ final ReferenceCountedObject<Object> rco = rco(impl);
+
+ // First acquisition should create new instance.
+ final ReferenceCountedObject<Object>.Reference ref1 = rco.acquire();
+ assertThat(ref1.get()).isSameAs(object);
+ verify(impl).newInstance();
+ verifyNoMoreInteractions(impl);
+
+ // Second acquisition should just bump the ref count.
+ final ReferenceCountedObject<Object>.Reference ref2 = rco.acquire();
+ assertThat(ref2.get()).isSameAs(object);
+ verifyNoMoreInteractions(impl);
+
+ // First dereference should just decrease the ref count.
+ ref1.release();
+ verifyNoMoreInteractions(impl);
+
+ // Second dereference should destroy the instance.
+ ref2.release();
+ verify(impl).destroyInstance(object);
+ verifyNoMoreInteractions(impl);
+ }
+
+ @Test
+ public void testAcquireIfNull() throws Exception {
+ final Object otherObject = "Other object";
+ final Impl impl = mock(Impl.class);
+ when(impl.newInstance()).thenReturn(object);
+ final ReferenceCountedObject<Object> rco = rco(impl);
+ final ReferenceCountedObject<Object>.Reference ref = rco.acquireIfNull(otherObject);
+
+ verify(impl, never()).newInstance();
+ assertThat(ref.get()).isSameAs(otherObject);
+ ref.release();
+ verifyNoMoreInteractions(impl);
+ }
+
+ /**
+ * This test attempts to test that finalization works. It loops at most 100
+ * times performing GCs and checking to see if the finalizer was called.
+ * Usually objects are finalized after 2 GCs, so the loop should complete
+ * quite quickly.
+ *
+ * @throws Exception
+ * If an unexpected error occurred.
+ */
+ @Test
+ public void testFinalization() throws Exception {
+ final Impl impl = mock(Impl.class);
+ when(impl.newInstance()).thenReturn(object);
+ final ReferenceCountedObject<Object> rco = rco(impl);
+ ReferenceCountedObject<Object>.Reference ref = rco.acquire();
+ System.gc();
+ System.gc();
+ verify(impl, never()).destroyInstance(object);
+ // Read in order to prevent optimization.
+ if (ref != null) {
+ ref = null;
+ }
+ for (int i = 0; i < 100; i++) {
+ System.gc();
+ try {
+ verify(impl).destroyInstance(object);
+ break; // Finalized so stop.
+ } catch (final Throwable t) {
+ // Retry.
+ }
+ }
+ verify(impl).destroyInstance(object);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testStaleReference() throws Exception {
+ final Impl impl = mock(Impl.class);
+ when(impl.newInstance()).thenReturn(object);
+ final ReferenceCountedObject<Object> rco = rco(impl);
+ final ReferenceCountedObject<Object>.Reference ref = rco.acquire();
+ ref.release();
+ ref.get();
+ }
+
+ private ReferenceCountedObject<Object> rco(final Impl impl) {
+ return new ReferenceCountedObject<Object>() {
+
+ @Override
+ protected void destroyInstance(final Object instance) {
+ impl.destroyInstance(instance);
+ }
+
+ @Override
+ protected Object newInstance() {
+ return impl.newInstance();
+ }
+ };
+ }
+}
diff --git a/opendj-sdk/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthenticatedConnectionFactory.java b/opendj-sdk/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthenticatedConnectionFactory.java
index 8a03be8..c15ef3c 100644
--- a/opendj-sdk/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthenticatedConnectionFactory.java
+++ b/opendj-sdk/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthenticatedConnectionFactory.java
@@ -67,10 +67,9 @@
* An authenticated connection supports all operations except Bind
* operations.
*/
- static final class AuthenticatedConnection extends AbstractConnectionWrapper {
+ static final class AuthenticatedConnection extends AbstractConnectionWrapper<Connection> {
private final BindRequest request;
-
private volatile BindResult result;
private AuthenticatedConnection(final Connection connection, final BindRequest request,
@@ -80,28 +79,19 @@
this.result = result;
}
- /**
+ /*
* Bind operations are not supported by pre-authenticated connections.
- * This method will always throw {@code UnsupportedOperationException}.
+ * These methods will always throw {@code UnsupportedOperationException}.
*/
- /**
- * {@inheritDoc}
- */
public BindResult bind(BindRequest request) throws ErrorResultException {
throw new UnsupportedOperationException();
}
- /**
- * {@inheritDoc}
- */
public BindResult bind(String name, char[] password) throws ErrorResultException {
throw new UnsupportedOperationException();
}
- /**
- * {@inheritDoc}
- */
public FutureResult<BindResult> bindAsync(BindRequest request,
IntermediateResponseHandler intermediateResponseHandler,
ResultHandler<? super BindResult> resultHandler) {
@@ -140,15 +130,19 @@
throw new UnsupportedOperationException();
}
- // Wrap the client handler so that we can update the connection
- // state.
+ /*
+ * Wrap the client handler so that we can update the connection
+ * state.
+ */
final ResultHandler<? super BindResult> clientHandler = handler;
final ResultHandler<BindResult> handlerWrapper = new ResultHandler<BindResult>() {
public void handleErrorResult(final ErrorResultException error) {
- // This connection is now unauthenticated so prevent
- // further use.
+ /*
+ * This connection is now unauthenticated so prevent further
+ * use.
+ */
connection.close();
if (clientHandler != null) {
@@ -170,9 +164,6 @@
return connection.bindAsync(request, null, handlerWrapper);
}
- /**
- * {@inheritDoc}
- */
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AuthenticatedConnection(");
@@ -255,9 +246,11 @@
this.request = request;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void close() {
+ parentFactory.close();
+ }
+
public Connection getConnection() throws ErrorResultException {
final Connection connection = parentFactory.getConnection();
BindResult bindResult = null;
@@ -268,14 +261,14 @@
connection.close();
}
}
- // If the bind didn't succeed then an exception will have been thrown
- // and this line will not be reached.
+
+ /*
+ * If the bind didn't succeed then an exception will have been thrown
+ * and this line will not be reached.
+ */
return new AuthenticatedConnection(connection, request, bindResult);
}
- /**
- * {@inheritDoc}
- */
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
final FutureResultImpl future = new FutureResultImpl(request, handler);
@@ -317,9 +310,6 @@
return this;
}
- /**
- * {@inheritDoc}
- */
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("AuthenticatedConnectionFactory(");
diff --git a/opendj-sdk/opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPAuthnFilter.java b/opendj-sdk/opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPAuthnFilter.java
index ae8b537..6e6fbd5 100644
--- a/opendj-sdk/opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPAuthnFilter.java
+++ b/opendj-sdk/opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPAuthnFilter.java
@@ -104,17 +104,16 @@
private boolean supportHTTPBasicAuthentication = true;
private ServletApiVersionAdapter syncFactory;
- /**
- * {@inheritDoc}
- */
@Override
public void destroy() {
- // TODO: We should release any resources maintained by the filter, such as connection pools.
+ if (searchLDAPConnectionFactory != null) {
+ searchLDAPConnectionFactory.close();
+ }
+ if (bindLDAPConnectionFactory != null) {
+ bindLDAPConnectionFactory.close();
+ }
}
- /**
- * {@inheritDoc}
- */
@Override
public void doFilter(final ServletRequest request, final ServletResponse response,
final FilterChain chain) throws IOException, ServletException {
@@ -295,9 +294,6 @@
}
}
- /**
- * {@inheritDoc}
- */
@Override
public void init(final FilterConfig config) throws ServletException {
// FIXME: make it possible to configure the filter externally, especially
diff --git a/opendj-sdk/opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPConnectionFactoryProvider.java b/opendj-sdk/opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPConnectionFactoryProvider.java
index 276349b..3c402cf 100644
--- a/opendj-sdk/opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPConnectionFactoryProvider.java
+++ b/opendj-sdk/opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPConnectionFactoryProvider.java
@@ -15,6 +15,9 @@
*/
package org.forgerock.opendj.rest2ldap.servlet;
+import static org.forgerock.json.resource.Resources.newInternalConnectionFactory;
+import static org.forgerock.opendj.rest2ldap.Rest2LDAP.configureConnectionFactory;
+
import java.io.InputStream;
import java.util.Map;
@@ -25,8 +28,11 @@
import org.codehaus.jackson.map.ObjectMapper;
import org.forgerock.json.fluent.JsonValue;
import org.forgerock.json.resource.CollectionResourceProvider;
+import org.forgerock.json.resource.Connection;
import org.forgerock.json.resource.ConnectionFactory;
-import org.forgerock.json.resource.Resources;
+import org.forgerock.json.resource.FutureResult;
+import org.forgerock.json.resource.ResourceException;
+import org.forgerock.json.resource.ResultHandler;
import org.forgerock.json.resource.Router;
import org.forgerock.opendj.rest2ldap.AuthorizationPolicy;
import org.forgerock.opendj.rest2ldap.Rest2LDAP;
@@ -90,8 +96,8 @@
final org.forgerock.opendj.ldap.ConnectionFactory ldapFactory;
if (ldapFactoryName != null) {
ldapFactory =
- Rest2LDAP.configureConnectionFactory(configuration.get(
- "ldapConnectionFactories").required(), ldapFactoryName);
+ configureConnectionFactory(configuration.get("ldapConnectionFactories")
+ .required(), ldapFactoryName);
} else {
ldapFactory = null;
}
@@ -107,7 +113,33 @@
.configureMapping(mapping).build();
router.addRoute(mappingUrl, provider);
}
- return Resources.newInternalConnectionFactory(router);
+ final ConnectionFactory factory = newInternalConnectionFactory(router);
+ if (ldapFactory != null) {
+ /*
+ * Return a wrapper which will release resources associated with
+ * the LDAP connection factory (pooled connections, transport,
+ * etc).
+ */
+ return new ConnectionFactory() {
+ @Override
+ public FutureResult<Connection> getConnectionAsync(
+ ResultHandler<Connection> handler) {
+ return factory.getConnectionAsync(handler);
+ }
+
+ @Override
+ public Connection getConnection() throws ResourceException {
+ return factory.getConnection();
+ }
+
+ @Override
+ public void close() {
+ ldapFactory.close();
+ }
+ };
+ } else {
+ return factory;
+ }
} catch (final ServletException e) {
// Rethrow.
throw e;
diff --git a/opendj-sdk/opendj3/opendj-server2x-adapter/src/main/java/org/forgerock/opendj/adapter/server2x/Adapters.java b/opendj-sdk/opendj3/opendj-server2x-adapter/src/main/java/org/forgerock/opendj/adapter/server2x/Adapters.java
index c0ddac4..c8dadfb 100644
--- a/opendj-sdk/opendj3/opendj-server2x-adapter/src/main/java/org/forgerock/opendj/adapter/server2x/Adapters.java
+++ b/opendj-sdk/opendj3/opendj-server2x-adapter/src/main/java/org/forgerock/opendj/adapter/server2x/Adapters.java
@@ -145,6 +145,11 @@
ConnectionFactory factory = new ConnectionFactory() {
@Override
+ public void close() {
+ // Nothing to do.
+ }
+
+ @Override
public FutureResult<Connection> getConnectionAsync(
ResultHandler<? super Connection> handler) {
if (handler != null) {
--
Gitblit v1.10.0