opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransport.java
@@ -22,48 +22,64 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package com.forgerock.opendj.ldap; import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG; import java.io.IOException; import java.util.logging.Level; import org.glassfish.grizzly.nio.transport.TCPNIOTransport; import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder; import org.glassfish.grizzly.strategies.SameThreadIOStrategy; import org.glassfish.grizzly.strategies.WorkerThreadIOStrategy; import com.forgerock.opendj.util.ReferenceCountedObject; /** * The default {@link TCPNIOTransport} which all {@code LDAPConnectionFactory}s * and {@code LDAPListener}s will use unless otherwise specified in their * options. */ final class DefaultTCPNIOTransport { private static TCPNIOTransport defaultTransport = null; final class DefaultTCPNIOTransport extends ReferenceCountedObject<TCPNIOTransport> { static final DefaultTCPNIOTransport DEFAULT_TRANSPORT = new DefaultTCPNIOTransport(); /** * Returns the default {@link TCPNIOTransport} which all * {@code LDAPConnectionFactory}s and {@code LDAPListener}s will use unless * otherwise specified in their options. * * @return The default {@link TCPNIOTransport}. */ static synchronized TCPNIOTransport getInstance() { if (defaultTransport == null) { private DefaultTCPNIOTransport() { // Prevent instantiation. } @Override protected void destroyInstance(final TCPNIOTransport instance) { try { instance.stop(); } catch (final IOException e) { DEBUG_LOG.log(Level.WARNING, "An error occurred while shutting down the Grizzly transport", e.getMessage()); } } @Override protected TCPNIOTransport newInstance() { final TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance(); // Determine which threading strategy to use, and total number of // threads. /* * Determine which threading strategy to use, and total number of * threads. */ final String useWorkerThreadsStr = System.getProperty("org.forgerock.opendj.transport.useWorkerThreads"); final boolean useWorkerThreadStrategy; if (useWorkerThreadsStr != null) { useWorkerThreadStrategy = Boolean.parseBoolean(useWorkerThreadsStr); } else { // The most best performing strategy to use is the // SameThreadIOStrategy, however it can only be used in cases // where result listeners will not block. /* * The most best performing strategy to use is the * SameThreadIOStrategy, however it can only be used in cases where * result listeners will not block. */ useWorkerThreadStrategy = true; } @@ -77,26 +93,22 @@ final int cpus = Runtime.getRuntime().availableProcessors(); // Calculate the number of selector threads. final String selectorsStr = System.getProperty("org.forgerock.opendj.transport.selectors"); final String selectorsStr = System.getProperty("org.forgerock.opendj.transport.selectors"); final int selectorThreadCount; if (selectorsStr != null) { selectorThreadCount = Integer.parseInt(selectorsStr); } else { selectorThreadCount = useWorkerThreadStrategy ? Math.max(2, cpus / 4) : Math.max(5, (cpus / 2) - 1); useWorkerThreadStrategy ? Math.max(2, cpus / 4) : Math.max(5, (cpus / 2) - 1); } builder.getSelectorThreadPoolConfig().setCorePoolSize(selectorThreadCount) .setMaxPoolSize(selectorThreadCount).setPoolName( "OpenDJ LDAP SDK Grizzly selector thread"); builder.getSelectorThreadPoolConfig().setCorePoolSize(selectorThreadCount).setMaxPoolSize( selectorThreadCount).setPoolName("OpenDJ LDAP SDK Grizzly selector thread"); // Calculate the number of worker threads. if (builder.getWorkerThreadPoolConfig() != null) { final String workersStr = System.getProperty("org.forgerock.opendj.transport.workers"); final String workersStr = System.getProperty("org.forgerock.opendj.transport.workers"); final int workerThreadCount; if (workersStr != null) { @@ -105,9 +117,8 @@ workerThreadCount = useWorkerThreadStrategy ? Math.max(5, (cpus * 2)) : 0; } builder.getWorkerThreadPoolConfig().setCorePoolSize(workerThreadCount) .setMaxPoolSize(workerThreadCount).setPoolName( "OpenDJ LDAP SDK Grizzly worker thread"); builder.getWorkerThreadPoolConfig().setCorePoolSize(workerThreadCount).setMaxPoolSize( workerThreadCount).setPoolName("OpenDJ LDAP SDK Grizzly worker thread"); } // Parse IO related options. @@ -131,24 +142,19 @@ builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr)); } defaultTransport = builder.build(); final TCPNIOTransport transport = builder.build(); // FIXME: raise bug in Grizzly. We should not need to do this, but // failure to do so causes many deadlocks. defaultTransport.setSelectorRunnersCount(selectorThreadCount); transport.setSelectorRunnersCount(selectorThreadCount); try { defaultTransport.start(); transport.start(); } catch (final IOException e) { throw new RuntimeException(e); } } return defaultTransport; } private DefaultTCPNIOTransport() { // Prevent instantiation. return transport; } } opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java
@@ -513,7 +513,6 @@ } void registerConnection(final Connection<?> connection, final LDAPConnection ldapConnection) { TimeoutChecker.INSTANCE.addConnection(ldapConnection); LDAP_CONNECTION_ATTR.set(connection, ldapConnection); } } opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -27,6 +27,7 @@ package com.forgerock.opendj.ldap; import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import java.io.IOException; @@ -81,7 +82,6 @@ import org.glassfish.grizzly.ssl.SSLFilter; import com.forgerock.opendj.util.CompletedFutureResult; import com.forgerock.opendj.util.StaticUtils; import com.forgerock.opendj.util.Validator; /** @@ -109,7 +109,7 @@ private final org.glassfish.grizzly.Connection<?> connection; private final LDAPWriter ldapWriter = new LDAPWriter(); private final AtomicInteger nextMsgID = new AtomicInteger(1); private final LDAPOptions options; private final LDAPConnectionFactoryImpl factory; private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests = new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>(); private final Object stateLock = new Object(); @@ -120,22 +120,12 @@ private boolean isFailed = false; private List<ConnectionEventListener> listeners = null; /** * Creates a new LDAP connection. * * @param connection * The Grizzly connection. * @param options * The LDAP client options. */ LDAPConnection(final org.glassfish.grizzly.Connection<?> connection, final LDAPOptions options) { LDAPConnection(final org.glassfish.grizzly.Connection<?> connection, final LDAPConnectionFactoryImpl factory) { this.connection = connection; this.options = options; this.factory = factory; } /** * {@inheritDoc} */ @Override public FutureResult<Void> abandonAsync(final AbandonRequest request) { final AbstractLDAPFutureResultImpl<?> pendingRequest; @@ -148,9 +138,11 @@ pendingRequest = pendingRequests.remove(request.getRequestID()); } if (pendingRequest == null) { // There has never been a request with the specified message ID or // the response has already been received and handled. We can ignore // this abandon request. /* * There has never been a request with the specified message ID * or the response has already been received and handled. We can * ignore this abandon request. */ // Message ID will be -1 since no request was sent. return new CompletedFutureResult<Void>((Void) null); @@ -173,9 +165,6 @@ } } /** * {@inheritDoc} */ @Override public FutureResult<Result> addAsync(final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler, @@ -208,9 +197,6 @@ return future; } /** * {@inheritDoc} */ @Override public void addConnectionEventListener(final ConnectionEventListener listener) { Validator.ensureNotNull(listener); @@ -236,9 +222,6 @@ } } /** * {@inheritDoc} */ @Override public FutureResult<BindResult> bindAsync(final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler, @@ -308,9 +291,6 @@ return future; } /** * {@inheritDoc} */ @Override public void close(final UnbindRequest request, final String reason) { // FIXME: I18N need to internationalize this message. @@ -321,9 +301,6 @@ "Connection closed by client" + (reason != null ? ": " + reason : ""))); } /** * {@inheritDoc} */ @Override public FutureResult<CompareResult> compareAsync(final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler, @@ -356,9 +333,6 @@ return future; } /** * {@inheritDoc} */ @Override public FutureResult<Result> deleteAsync(final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler, @@ -391,9 +365,6 @@ return future; } /** * {@inheritDoc} */ @Override public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync( final ExtendedRequest<R> request, @@ -447,9 +418,6 @@ return future; } /** * {@inheritDoc} */ @Override public boolean isClosed() { synchronized (stateLock) { @@ -457,9 +425,6 @@ } } /** * {@inheritDoc} */ @Override public boolean isValid() { synchronized (stateLock) { @@ -467,9 +432,6 @@ } } /** * {@inheritDoc} */ @Override public FutureResult<Result> modifyAsync(final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler, @@ -502,9 +464,6 @@ return future; } /** * {@inheritDoc} */ @Override public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler, @@ -537,9 +496,6 @@ return future; } /** * {@inheritDoc} */ @Override public void removeConnectionEventListener(final ConnectionEventListener listener) { Validator.ensureNotNull(listener); @@ -550,9 +506,6 @@ } } /** * {@inheritDoc} */ @Override public FutureResult<Result> searchAsync(final SearchRequest request, final IntermediateResponseHandler intermediateResponseHandler, @@ -585,9 +538,6 @@ return future; } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); @@ -600,7 +550,7 @@ } long cancelExpiredRequests(final long currentTime) { final long timeout = options.getTimeout(TimeUnit.MILLISECONDS); final long timeout = factory.getLDAPOptions().getTimeout(TimeUnit.MILLISECONDS); long delay = timeout; if (timeout > 0) { for (final int requestID : pendingRequests.keySet()) { @@ -608,10 +558,9 @@ if (future != null) { final long diff = (future.getTimestamp() + timeout) - currentTime; if (diff <= 0 && pendingRequests.remove(requestID) != null) { StaticUtils.DEBUG_LOG.fine("Cancelling expired future result: " + future); DEBUG_LOG.fine("Cancelling expired future result: " + future); final Result result = Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT); future.adaptErrorResult(result); abandonAsync(Requests.newAbandonRequest(future.getRequestID())); } else { delay = Math.min(delay, diff); @@ -691,7 +640,7 @@ // Underlying channel prob blown up. Just ignore. } } TimeoutChecker.INSTANCE.removeConnection(this); factory.getTimeoutChecker().removeConnection(this); connection.closeSilently(); // Notify listeners. @@ -721,7 +670,7 @@ } LDAPOptions getLDAPOptions() { return options; return factory.getLDAPOptions(); } AbstractLDAPFutureResultImpl<?> getPendingRequest(final Integer messageID) { @@ -837,12 +786,14 @@ private void checkConnectionIsValid() throws ErrorResultException { if (!isValid0()) { if (failedDueToDisconnect) { // Connection termination was triggered remotely. We don't want // to blindly pass on the result code to requests since it could // be confused for a genuine response. For example, if the // disconnect contained the invalidCredentials result code then // this could be misinterpreted as a genuine authentication // failure for subsequent bind requests. /* * Connection termination was triggered remotely. We don't want * to blindly pass on the result code to requests since it could * be confused for a genuine response. For example, if the * disconnect contained the invalidCredentials result code then * this could be misinterpreted as a genuine authentication * failure for subsequent bind requests. */ throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, "Connection closed by server"); } else { opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -27,11 +27,14 @@ package com.forgerock.opendj.ldap; import static com.forgerock.opendj.ldap.DefaultTCPNIOTransport.DEFAULT_TRANSPORT; import static com.forgerock.opendj.ldap.TimeoutChecker.TIMEOUT_CHECKER; import static org.forgerock.opendj.ldap.ErrorResultException.*; import java.io.IOException; import java.net.SocketAddress; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLEngine; @@ -55,6 +58,7 @@ import org.glassfish.grizzly.nio.transport.TCPNIOTransport; import com.forgerock.opendj.util.AsynchronousFutureResult; import com.forgerock.opendj.util.ReferenceCountedObject; /** * LDAP connection factory implementation. @@ -154,10 +158,14 @@ } private LDAPConnection adaptConnection(final org.glassfish.grizzly.Connection<?> connection) { // Test shows that its much faster with non block writes but risk // running out of memory if the server is slow. /* * Test shows that its much faster with non block writes but risk * running out of memory if the server is slow. */ connection.configureBlocking(true); final LDAPConnection ldapConnection = new LDAPConnection(connection, options); final LDAPConnection ldapConnection = new LDAPConnection(connection, LDAPConnectionFactoryImpl.this); timeoutChecker.get().addConnection(ldapConnection); clientFilter.registerConnection(connection, ldapConnection); return ldapConnection; } @@ -194,7 +202,10 @@ private final FilterChain defaultFilterChain; private final LDAPOptions options; private final SocketAddress socketAddress; private final TCPNIOTransport transport; private final ReferenceCountedObject<TCPNIOTransport>.Reference transport; private final AtomicBoolean isClosed = new AtomicBoolean(); private final ReferenceCountedObject<TimeoutChecker>.Reference timeoutChecker = TIMEOUT_CHECKER .acquire(); /** * Creates a new LDAP connection factory implementation which can be used to @@ -207,11 +218,7 @@ * The LDAP connection options to use when creating connections. */ public LDAPConnectionFactoryImpl(final SocketAddress address, final LDAPOptions options) { if (options.getTCPNIOTransport() == null) { this.transport = DefaultTCPNIOTransport.getInstance(); } else { this.transport = options.getTCPNIOTransport(); } this.transport = DEFAULT_TRANSPORT.acquireIfNull(options.getTCPNIOTransport()); this.socketAddress = address; this.options = new LDAPOptions(options); this.clientFilter = @@ -220,9 +227,14 @@ FilterChainBuilder.stateless().add(new TransportFilter()).add(clientFilter).build(); } /** * {@inheritDoc} */ @Override public void close() { if (isClosed.compareAndSet(false, true)) { transport.release(); timeoutChecker.release(); } } @Override public Connection getConnection() throws ErrorResultException { try { @@ -232,14 +244,12 @@ } } /** * {@inheritDoc} */ @Override public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> handler) { final SocketConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(transport).processor(defaultFilterChain).build(); TCPNIOConnectorHandler.builder(transport.get()).processor(defaultFilterChain) .build(); final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future = new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(handler); final CompletionHandlerAdapter cha = new CompletionHandlerAdapter(future); @@ -256,9 +266,14 @@ return socketAddress; } /** * {@inheritDoc} */ TimeoutChecker getTimeoutChecker() { return timeoutChecker.get(); } LDAPOptions getLDAPOptions() { return options; } @Override public String toString() { final StringBuilder builder = new StringBuilder(); opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPListenerImpl.java
@@ -22,14 +22,18 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package com.forgerock.opendj.ldap; import static com.forgerock.opendj.ldap.DefaultTCPNIOTransport.DEFAULT_TRANSPORT; import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG; import java.io.Closeable; import java.io.IOException; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import org.forgerock.opendj.ldap.DecodeOptions; @@ -43,16 +47,17 @@ import org.glassfish.grizzly.nio.transport.TCPNIOServerConnection; import org.glassfish.grizzly.nio.transport.TCPNIOTransport; import com.forgerock.opendj.util.StaticUtils; import com.forgerock.opendj.util.ReferenceCountedObject; /** * LDAP listener implementation. */ public final class LDAPListenerImpl implements Closeable { private final TCPNIOTransport transport; private final ReferenceCountedObject<TCPNIOTransport>.Reference transport; private final FilterChain defaultFilterChain; private final ServerConnectionFactory<LDAPClientContext, Integer> connectionFactory; private final TCPNIOServerConnection serverConnection; private final AtomicBoolean isClosed = new AtomicBoolean(); /** * Creates a new LDAP listener implementation which will listen for LDAP @@ -72,11 +77,7 @@ public LDAPListenerImpl(final SocketAddress address, final ServerConnectionFactory<LDAPClientContext, Integer> factory, final LDAPListenerOptions options) throws IOException { if (options.getTCPNIOTransport() == null) { this.transport = DefaultTCPNIOTransport.getInstance(); } else { this.transport = options.getTCPNIOTransport(); } this.transport = DEFAULT_TRANSPORT.acquireIfNull(options.getTCPNIOTransport()); this.connectionFactory = factory; final DecodeOptions decodeOptions = new DecodeOptions(options.getDecodeOptions()); @@ -85,26 +86,22 @@ new LDAPServerFilter(this, new LDAPReader(decodeOptions), options .getMaxRequestSize())).build(); final TCPNIOBindingHandler bindingHandler = TCPNIOBindingHandler.builder(transport).processor(defaultFilterChain).build(); TCPNIOBindingHandler.builder(transport.get()).processor(defaultFilterChain).build(); this.serverConnection = bindingHandler.bind(address, options.getBacklog()); } /** * {@inheritDoc} */ @Override public void close() { if (isClosed.compareAndSet(false, true)) { try { serverConnection.close().get(); } catch (final InterruptedException e) { // Cannot handle here. Thread.currentThread().interrupt(); } catch (final Exception e) { // Ignore the exception. if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.log(Level.WARNING, "Exception occurred while closing listener:" + e.getMessage(), e); DEBUG_LOG.log(Level.WARNING, "Exception occurred while closing listener", e); } transport.release(); } } @@ -117,9 +114,7 @@ return serverConnection.getLocalAddress(); } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("LDAPListener("); opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
@@ -22,47 +22,64 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2013 ForgeRock AS. */ package com.forgerock.opendj.ldap; import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import org.glassfish.grizzly.utils.LinkedTransferQueue; import com.forgerock.opendj.util.StaticUtils; import com.forgerock.opendj.util.ReferenceCountedObject; /** * Checks connection for pending requests that have timed out. */ final class TimeoutChecker { static final TimeoutChecker INSTANCE = new TimeoutChecker(); static final ReferenceCountedObject<TimeoutChecker> TIMEOUT_CHECKER = new ReferenceCountedObject<TimeoutChecker>() { @Override protected void destroyInstance(final TimeoutChecker instance) { instance.shutdown(); } private final LinkedTransferQueue<LDAPConnection> connections; private transient final ReentrantLock lock; private transient final Condition available; @Override protected TimeoutChecker newInstance() { return new TimeoutChecker(); } }; private final Condition available; private final List<LDAPConnection> connections; private final ReentrantLock lock; private boolean shutdownRequested = false; private TimeoutChecker() { this.connections = new LinkedTransferQueue<LDAPConnection>(); this.connections = new LinkedList<LDAPConnection>(); this.lock = new ReentrantLock(); this.available = lock.newCondition(); final Thread checkerThread = new Thread("Timeout Checker") { final Thread checkerThread = new Thread("OpenDJ LDAP SDK Connection Timeout Checker") { @Override public void run() { StaticUtils.DEBUG_LOG.fine("Timeout Checker Starting"); final ReentrantLock lock = TimeoutChecker.this.lock; DEBUG_LOG.fine("Timeout Checker Starting"); lock.lock(); try { while (true) { while (!shutdownRequested) { final long currentTime = System.currentTimeMillis(); long delay = 0; for (final LDAPConnection connection : connections) { StaticUtils.DEBUG_LOG.finer("Checking connection " + connection + " delay = " + delay); if (DEBUG_LOG.isLoggable(Level.FINER)) { DEBUG_LOG.finer("Checking connection " + connection + " delay = " + delay); } final long newDelay = connection.cancelExpiredRequests(currentTime); if (newDelay > 0) { if (delay > 0) { @@ -75,15 +92,17 @@ try { if (delay <= 0) { StaticUtils.DEBUG_LOG.finer("There are no connections with " DEBUG_LOG.finer("There are no connections with " + "timeout specified. Sleeping"); available.await(); } else { StaticUtils.DEBUG_LOG.finer("Sleeping for " + delay + "ms"); if (DEBUG_LOG.isLoggable(Level.FINER)) { DEBUG_LOG.log(Level.FINER, "Sleeping for " + delay + " ms"); } available.await(delay, TimeUnit.MILLISECONDS); } } catch (final InterruptedException e) { // Just go around again. shutdownRequested = true; } } } finally { @@ -97,7 +116,6 @@ } void addConnection(final LDAPConnection connection) { final ReentrantLock lock = this.lock; lock.lock(); try { connections.add(connection); @@ -108,7 +126,6 @@ } void removeConnection(final LDAPConnection connection) { final ReentrantLock lock = this.lock; lock.lock(); try { connections.remove(connection); @@ -116,4 +133,14 @@ lock.unlock(); } } private void shutdown() { lock.lock(); try { shutdownRequested = true; available.signalAll(); } finally { lock.unlock(); } } } opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/ReferenceCountedObject.java
New file @@ -0,0 +1,162 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2013 ForgeRock AS. */ package com.forgerock.opendj.util; /** * An object which is lazily created when first referenced, and destroyed when * the last reference is released. * * @param <T> * The type of referenced object. */ public abstract class ReferenceCountedObject<T> { /** * A reference to the reference counted object which will automatically be * released during garbage collection. */ public final class Reference { private T value; private Reference(final T value) { this.value = value; } /** * Returns the referenced object. * * @return The referenced object. * @throws NullPointerException * If the referenced object has already been released. */ public T get() { if (value == null) { throw new NullPointerException(); // Fail-fast. } return value; } /** * Decrements the reference count for the reference counted object if * this reference refers to the reference counted instance. If the * reference count drops to zero then the referenced object will be * destroyed. */ public void release() { releaseIfSame(value); /* * Force NPE for subsequent get() attempts and prevent multiple * releases. */ value = null; } /** * Provide a finalizer because reference counting is intended for * expensive rarely created resources which should not be accidentally * left around. */ @Override protected void finalize() { release(); } } private T instance = null; private final Object lock = new Object(); private int refCount = 0; /** * Creates a new referenced object whose reference count is initially zero. */ protected ReferenceCountedObject() { // Nothing to do. } /** * Returns a reference to the reference counted object. * * @return A reference to the reference counted object. */ public final Reference acquire() { synchronized (lock) { if (refCount++ == 0) { assert instance == null; instance = newInstance(); } return new Reference(instance); } } /** * Returns a reference to the provided object or, if it is {@code null}, a * reference to the reference counted object. * * @param value * The object to be referenced, or {@code null} if the reference * counted object should be used. * @return A reference to the provided object or, if it is {@code null}, a * reference to the reference counted object. */ public final Reference acquireIfNull(final T value) { return value != null ? new Reference(value) : acquire(); } /** * Invoked when a reference is released and the reference count will become * zero. Implementations should release any resources associated with the * resource and should not return until the resources have been released. * * @param instance * The instance to be destroyed. */ protected abstract void destroyInstance(T instance); /** * Invoked when a reference is acquired and the current reference count is * zero. Implementations should create a new instance as fast as possible. * * @return The new instance. */ protected abstract T newInstance(); private final void releaseIfSame(final T instance) { T instanceToRelease = null; synchronized (lock) { if (this.instance == instance) { if (--refCount == 0) { instanceToRelease = instance; this.instance = null; } } } if (instanceToRelease != null) { destroyInstance(instanceToRelease); } } } opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/StaticUtils.java
@@ -22,7 +22,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package com.forgerock.opendj.util; @@ -48,6 +48,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -86,9 +87,30 @@ // UTC TimeZone is assumed to never change over JVM lifetime private static final TimeZone TIME_ZONE_UTC_OBJ = TimeZone.getTimeZone(TIME_ZONE_UTC); private static ScheduledExecutorService defaultScheduler = null; /** * The default scheduler which should be used when the application does not * provide one. */ public static final ReferenceCountedObject<ScheduledExecutorService> DEFAULT_SCHEDULER = new ReferenceCountedObject<ScheduledExecutorService>() { private static final Object DEFAULT_SCHEDULER_LOCK = new Object(); @Override protected ScheduledExecutorService newInstance() { final ThreadFactory factory = newThreadFactory(null, "OpenDJ LDAP SDK Default Scheduler", true); return Executors.newSingleThreadScheduledExecutor(factory); } @Override protected void destroyInstance(ScheduledExecutorService instance) { instance.shutdown(); try { instance.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }; /** * Retrieves a string representation of the provided byte in hexadecimal. @@ -1395,22 +1417,6 @@ } /** * Returns the default scheduler which should be used by the SDK. * * @return The default scheduler. */ public static ScheduledExecutorService getDefaultScheduler() { synchronized (DEFAULT_SCHEDULER_LOCK) { if (defaultScheduler == null) { final ThreadFactory factory = newThreadFactory(null, "OpenDJ SDK Default Scheduler", true); defaultScheduler = Executors.newSingleThreadScheduledExecutor(factory); } } return defaultScheduler; } /** * Retrieves the best human-readable message for the provided exception. For * exceptions defined in the OpenDJ project, it will attempt to use the * message (combining it with the message ID if available). For some opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractConnectionWrapper.java
@@ -55,12 +55,15 @@ * An abstract base class from which connection wrappers may be easily * implemented. The default implementation of each method is to delegate to the * wrapped connection. * * @param <C> * The type of wrapped connection. */ public abstract class AbstractConnectionWrapper implements Connection { public abstract class AbstractConnectionWrapper<C extends Connection> implements Connection { /** * The wrapped connection. */ protected final Connection connection; protected final C connection; /** * Creates a new connection wrapper. @@ -68,7 +71,7 @@ * @param connection * The connection to be wrapped. */ protected AbstractConnectionWrapper(final Connection connection) { protected AbstractConnectionWrapper(final C connection) { Validator.ensureNotNull(connection); this.connection = connection; } opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -22,11 +22,13 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS. * Portions copyright 2011-2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG; import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER; import static org.forgerock.opendj.ldap.ErrorResultException.*; import java.util.ArrayList; @@ -39,7 +41,7 @@ import java.util.logging.Level; import com.forgerock.opendj.util.AsynchronousFutureResult; import com.forgerock.opendj.util.StaticUtils; import com.forgerock.opendj.util.ReferenceCountedObject; import com.forgerock.opendj.util.Validator; /** @@ -55,11 +57,8 @@ ResultHandler<Connection> { private final ConnectionFactory factory; private final AtomicBoolean isOperational = new AtomicBoolean(true); private volatile FutureResult<?> pendingConnectFuture = null; private final int index; private MonitoredConnectionFactory(final ConnectionFactory factory, final int index) { @@ -67,9 +66,12 @@ this.index = index; } /** * {@inheritDoc} */ @Override public void close() { // Should we cancel the future? factory.close(); } @Override public Connection getConnection() throws ErrorResultException { final Connection connection; @@ -87,14 +89,12 @@ return connection; } /** * {@inheritDoc} */ @Override public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> resultHandler) { final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future = new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(resultHandler); new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>( resultHandler); final ResultHandler<Connection> failoverHandler = new ResultHandler<Connection>() { @Override @@ -141,9 +141,6 @@ connection.close(); } /** * {@inheritDoc} */ @Override public String toString() { return factory.toString(); @@ -156,9 +153,9 @@ private synchronized void checkIfAvailable() { if (!isOperational.get() && (pendingConnectFuture == null || pendingConnectFuture.isDone())) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String .format("Attempting reconnect to offline factory " + this)); if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format("Attempting reconnect to offline factory '%s'", this)); } pendingConnectFuture = factory.getConnectionAsync(this); } @@ -167,21 +164,22 @@ private void notifyOffline(final ErrorResultException error) { if (isOperational.getAndSet(false)) { // Transition from online to offline. if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.warning(String.format("Connection factory " + factory + " is no longer operational: " + error.getMessage())); if (DEBUG_LOG.isLoggable(Level.WARNING)) { DEBUG_LOG.warning(String.format( "Connection factory '%s' is no longer operational: %s", factory, error .getMessage())); } synchronized (stateLock) { offlineFactoriesCount++; if (offlineFactoriesCount == 1) { // Enable monitoring. if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format("Starting monitoring thread")); if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format("Starting monitoring thread")); } monitoringFuture = scheduler.scheduleWithFixedDelay(new MonitorRunnable(), 0, scheduler.get().scheduleWithFixedDelay(new MonitorRunnable(), 0, monitoringInterval, monitoringIntervalTimeUnit); } } @@ -191,16 +189,16 @@ private void notifyOnline() { if (!isOperational.getAndSet(true)) { // Transition from offline to online. if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO)) { StaticUtils.DEBUG_LOG.info(String.format("Connection factory " + factory + " is now operational")); if (DEBUG_LOG.isLoggable(Level.INFO)) { DEBUG_LOG.info(String.format("Connection factory'%s' is now operational", factory)); } synchronized (stateLock) { offlineFactoriesCount--; if (offlineFactoriesCount == 0) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format("Stopping monitoring thread")); if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format("Stopping monitoring thread")); } monitoringFuture.cancel(false); @@ -225,91 +223,65 @@ } private final List<MonitoredConnectionFactory> monitoredFactories; private final ScheduledExecutorService scheduler; private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; private final Object stateLock = new Object(); // Guarded by stateLock. private int offlineFactoriesCount = 0; private final long monitoringInterval; private final TimeUnit monitoringIntervalTimeUnit; // Guarded by stateLock. private ScheduledFuture<?> monitoringFuture; /** * Creates a new abstract load balancing algorithm which will monitor * offline connection factories every second using the default scheduler. * * @param factories * The connection factories. * Guarded by stateLock. */ private int offlineFactoriesCount = 0; private final long monitoringInterval; private final TimeUnit monitoringIntervalTimeUnit; /** * Guarded by stateLock. */ private ScheduledFuture<?> monitoringFuture; private AtomicBoolean isClosed = new AtomicBoolean(); AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) { this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler()); this(factories, 1, TimeUnit.SECONDS, null); } /** * Creates a new abstract load balancing algorithm which will monitor * offline connection factories using the specified frequency using the * default scheduler. * * @param factories * The connection factories. * @param interval * The interval between attempts to poll offline factories. * @param unit * The time unit for the interval between attempts to poll * offline factories. */ AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories, final long interval, final TimeUnit unit) { this(factories, interval, unit, StaticUtils.getDefaultScheduler()); this(factories, interval, unit, null); } /** * Creates a new abstract load balancing algorithm which will monitor * offline connection factories using the specified frequency and scheduler. * * @param factories * The connection factories. * @param interval * The interval between attempts to poll offline factories. * @param unit * The time unit for the interval between attempts to poll * offline factories. * @param scheduler * The scheduler which should for periodically monitoring dead * connection factories to see if they are usable again. */ AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories, final long interval, final TimeUnit unit, final ScheduledExecutorService scheduler) { Validator.ensureNotNull(factories, scheduler, unit); Validator.ensureNotNull(factories, unit); this.monitoredFactories = new ArrayList<MonitoredConnectionFactory>(factories.size()); int i = 0; for (final ConnectionFactory f : factories) { this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++)); } this.scheduler = scheduler; this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler); this.monitoringInterval = interval; this.monitoringIntervalTimeUnit = unit; } /** * {@inheritDoc} */ @Override public void close() { if (isClosed.compareAndSet(false, true)) { synchronized (stateLock) { if (monitoringFuture != null) { monitoringFuture.cancel(false); monitoringFuture = null; } } for (ConnectionFactory factory : monitoredFactories) { factory.close(); } scheduler.release(); } } @Override public final ConnectionFactory getConnectionFactory() throws ErrorResultException { final int index = getInitialConnectionFactoryIndex(); return getMonitoredConnectionFactory(index); } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); @@ -357,9 +329,11 @@ index = (index + 1) % maxIndex; } while (index != initialIndex); // All factories are offline so give up. We could have a // configurable policy here such as waiting indefinitely, or for a // configurable timeout period. /* * All factories are offline so give up. We could have a configurable * policy here such as waiting indefinitely, or for a configurable * timeout period. */ throw newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR, "No operational connection factories available"); } opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AuthenticatedConnectionFactory.java
@@ -22,7 +22,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS. * Portions copyright 2011-2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; @@ -51,43 +51,34 @@ * An authenticated connection supports all operations except Bind * operations. */ public static final class AuthenticatedConnection extends AbstractConnectionWrapper { public static final class AuthenticatedConnection extends AbstractConnectionWrapper<Connection> { private AuthenticatedConnection(final Connection connection) { super(connection); } /** /* * Bind operations are not supported by pre-authenticated connections. * This method will always throw {@code UnsupportedOperationException}. * These methods will always throw {@code UnsupportedOperationException}. */ /** * {@inheritDoc} */ public FutureResult<BindResult> bindAsync(final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler<? super BindResult> resultHandler) { throw new UnsupportedOperationException(); } /** * {@inheritDoc} */ public BindResult bind(BindRequest request) throws ErrorResultException { throw new UnsupportedOperationException(); } /** * {@inheritDoc} */ public BindResult bind(String name, char[] password) throws ErrorResultException { throw new UnsupportedOperationException(); } /** * {@inheritDoc} */ public String toString() { StringBuilder builder = new StringBuilder(); builder.append("AuthenticatedConnection("); @@ -100,11 +91,8 @@ private static final class FutureResultImpl { private final FutureResultTransformer<BindResult, Connection> futureBindResult; private final RecursiveFutureResult<Connection, BindResult> futureConnectionResult; private final BindRequest bindRequest; private Connection connection; private FutureResultImpl(final BindRequest request, @@ -148,7 +136,6 @@ } private final BindRequest request; private final ConnectionFactory parentFactory; /** @@ -169,9 +156,12 @@ this.request = request; } /** * {@inheritDoc} */ @Override public void close() { // Delegate. parentFactory.close(); } public Connection getConnection() throws ErrorResultException { final Connection connection = parentFactory.getConnection(); boolean bindSucceeded = false; @@ -183,14 +173,15 @@ connection.close(); } } // If the bind didn't succeed then an exception will have been thrown // and this line will not be reached. /* * If the bind didn't succeed then an exception will have been thrown * and this line will not be reached. */ return new AuthenticatedConnection(connection); } /** * {@inheritDoc} */ @Override public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> handler) { @@ -200,9 +191,7 @@ return future.futureBindResult; } /** * {@inheritDoc} */ public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("AuthenticatedConnectionFactory("); opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connection.java
@@ -439,7 +439,10 @@ * * Calling {@code close} on a connection that is already closed has no * effect. * * @see Connections#uncloseable(Connection) */ @Override void close(); /** opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionFactory.java
@@ -22,11 +22,13 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS. * Portions copyright 2011-2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; import java.io.Closeable; /** * A connection factory provides an interface for obtaining a connection to a * Directory Server. Connection factories can be used to wrap other connection @@ -49,7 +51,28 @@ * should aim to close connections as soon as possible in order to avoid * resource contention. */ public interface ConnectionFactory { public interface ConnectionFactory extends Closeable { /** * Releases any resources associated with this connection factory. Depending * on the implementation a factory may: * <ul> * <li>do nothing * <li>close underlying connection factories (e.g. load-balancers) * <li>close pooled connections (e.g. connection pools) * <li>shutdown IO event service and related thread pools (e.g. Grizzly). * </ul> * Calling {@code close} on a connection factory which is already closed has * no effect. * <p> * Applications should avoid closing connection factories while there are * remaining active connections in use or connection attempts in progress. * * @see Connections#uncloseable(ConnectionFactory) */ @Override public void close(); /** * Asynchronously obtains a connection to the Directory Server associated * with this connection factory. The returned {@code FutureResult} can be opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
@@ -21,13 +21,11 @@ * CDDL HEADER END * * * Copyright 2011-2012 ForgeRock AS * Copyright 2011-2013 ForgeRock AS */ package org.forgerock.opendj.ldap; import java.io.Closeable; /** * A connection factory which maintains and re-uses a pool of connections. * Connections obtained from a connection pool are returned to the connection @@ -41,7 +39,7 @@ * Since pooled connections are re-used, applications must use operations such * as binds and StartTLS with extreme caution. */ public interface ConnectionPool extends ConnectionFactory, Closeable { public interface ConnectionPool extends ConnectionFactory { /** * Releases any resources associated with this connection pool. Pooled * connections will be permanently closed and this connection pool will no opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -386,6 +386,11 @@ return new ConnectionFactory() { @Override public void close() { factory.close(); } @Override public Connection getConnection() throws ErrorResultException { return factory.getConnection(); } @@ -396,9 +401,6 @@ return factory.getConnectionAsync(handler); } /** * {@inheritDoc} */ @Override public String toString() { return name; @@ -483,7 +485,7 @@ * @return An uncloseable view of the provided connection. */ public static Connection uncloseable(Connection connection) { return new AbstractConnectionWrapper(connection) { return new AbstractConnectionWrapper<Connection>(connection) { @Override public void close() { // Do nothing. @@ -496,6 +498,36 @@ }; } /** * Returns an uncloseable view of the provided connection factory. Attempts * to call {@link ConnectionFactory#close()} will be ignored. * * @param factory * The connection factory whose {@code close} method is to be * disabled. * @return An uncloseable view of the provided connection factory. */ public static ConnectionFactory uncloseable(final ConnectionFactory factory) { return new ConnectionFactory() { @Override public FutureResult<Connection> getConnectionAsync( ResultHandler<? super Connection> handler) { return factory.getConnectionAsync(handler); } @Override public Connection getConnection() throws ErrorResultException { return factory.getConnection(); } @Override public void close() { // Do nothing. } }; } // Prevent instantiation. private Connections() { // Do nothing. opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -75,18 +75,17 @@ * the pool completes. */ private final class ConnectionResultHandler implements ResultHandler<Connection> { /** * {@inheritDoc} */ @Override public void handleErrorResult(final ErrorResultException error) { // Connection attempt failed, so decrease the pool size. currentPoolSize.release(); if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format("Connection attempt failed: " + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); DEBUG_LOG.fine(String.format( "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error .getMessage(), poolSize - currentPoolSize.availablePermits(), poolSize)); } QueueElement holder; @@ -103,17 +102,13 @@ holder.getWaitingFuture().handleErrorResult(error); } /** * {@inheritDoc} */ @Override public void handleResult(final Connection connection) { if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format("Connection attempt succeeded: " + " currentPoolSize=%d, poolSize=%d", poolSize DEBUG_LOG.fine(String.format( "Connection attempt succeeded: currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } publishConnection(connection); } } @@ -173,10 +168,12 @@ notifyErrorOccurred = error != null; if (!notifyClose) { if (listeners == null) { // Create and register first listener. If an error has // already occurred on the underlying connection, then // the listener may be immediately invoked so ensure // that it is already in the list. /* * Create and register first listener. If an error has * already occurred on the underlying connection, then * the listener may be immediately invoked so ensure * that it is already in the list. */ listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); listeners.add(listener); connection.addConnectionEventListener(this); @@ -235,8 +232,10 @@ tmpListeners = listeners; } // Remove underlying listener if needed and do this before // subsequent connection events may occur. /* * Remove underlying listener if needed and do this before * subsequent connection events may occur. */ if (tmpListeners != null) { connection.removeConnectionEventListener(this); } @@ -245,17 +244,19 @@ if (connection.isValid()) { publishConnection(connection); } else { // The connection may have been disconnected by the remote // server, but the server may still be available. In order to // avoid leaving pending futures hanging indefinitely, we should // try to reconnect immediately. No need to release/acquire // currentPoolSize. /* * The connection may have been disconnected by the remote * server, but the server may still be available. In order to * avoid leaving pending futures hanging indefinitely, we should * try to reconnect immediately. No need to release/acquire * currentPoolSize. */ connection.close(); factory.getConnectionAsync(connectionResultHandler); if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format("Connection no longer valid. " + "currentPoolSize=%d, poolSize=%d", poolSize DEBUG_LOG.fine(String.format( "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } } @@ -542,7 +543,9 @@ } QueueElement(final ResultHandler<? super Connection> handler) { this.value = new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(handler); this.value = new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>( handler); } @Override @@ -575,24 +578,12 @@ private final int poolSize; private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); /** * Creates a new connection pool which will maintain {@code poolSize} * connections created using the provided connection factory. * * @param factory * The connection factory to use for creating new connections. * @param poolSize * The maximum size of the connection pool. */ FixedConnectionPool(final ConnectionFactory factory, final int poolSize) { this.factory = factory; this.poolSize = poolSize; this.currentPoolSize = new Semaphore(poolSize); } /** * {@inheritDoc} */ @Override public void close() { final LinkedList<Connection> idleConnections; @@ -602,8 +593,10 @@ } isClosed = true; // Remove any connections which are waiting in the queue as these // can be closed immediately. /* * Remove any connections which are waiting in the queue as these * can be closed immediately. */ idleConnections = new LinkedList<Connection>(); while (hasWaitingConnections()) { final QueueElement holder = queue.removeFirst(); @@ -621,11 +614,11 @@ for (final Connection connection : idleConnections) { closeConnection(connection); } // Close the underlying factory. factory.close(); } /** * {@inheritDoc} */ @Override public Connection getConnection() throws ErrorResultException { try { @@ -635,9 +628,6 @@ } } /** * {@inheritDoc} */ @Override public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> handler) { @@ -672,9 +662,9 @@ currentPoolSize.release(); if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format("Connection no longer valid. " + "currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); DEBUG_LOG.fine(String.format( "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } } } else { @@ -688,9 +678,6 @@ } } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); @@ -763,14 +750,14 @@ holder.getWaitingFuture().handleErrorResult(e); if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format("Connection attempt failed: " + e.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); DEBUG_LOG.fine(String.format( "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e .getMessage(), poolSize - currentPoolSize.availablePermits(), poolSize)); } } } else { final PooledConnection pooledConnection = new PooledConnection(connection); holder.getWaitingFuture().handleResult(pooledConnection); holder.getWaitingFuture().handleResult(new PooledConnection(connection)); } } opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -22,7 +22,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS. * Portions copyright 2011-2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; @@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.logging.Level; @@ -64,7 +65,7 @@ import com.forgerock.opendj.util.AsynchronousFutureResult; import com.forgerock.opendj.util.FutureResultTransformer; import com.forgerock.opendj.util.StaticUtils; import com.forgerock.opendj.util.ReferenceCountedObject; import com.forgerock.opendj.util.Validator; /** @@ -75,7 +76,7 @@ /** * A connection that sends heart beats and supports all operations. */ private final class ConnectionImpl extends AbstractConnectionWrapper implements private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements ConnectionEventListener, SearchResultHandler { /** @@ -85,9 +86,8 @@ * @param <R> * The type of result returned by the request. */ private abstract class DelayedFuture<R extends Result> extends AsynchronousFutureResult<R, ResultHandler<? super R>> implements Runnable { private abstract class DelayedFuture<R extends Result> extends AsynchronousFutureResult<R, ResultHandler<? super R>> implements Runnable { private volatile FutureResult<R> innerFuture = null; protected DelayedFuture(final ResultHandler<? super R> handler) { @@ -123,14 +123,19 @@ } // List of pending Bind or StartTLS requests which must be invoked // when the current heart beat completes. /* * List of pending Bind or StartTLS requests which must be invoked when * the current heart beat completes. */ private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>(); // Coordinates heart-beats with Bind and StartTLS requests. /* Coordinates heart-beats with Bind and StartTLS requests. */ private final Sync sync = new Sync(); // Timestamp of last response received (any response, not just heart beats). /* * Timestamp of last response received (any response, not just heart * beats). */ private volatile long timestamp = currentTimeMillis(); // Assume valid at creation. private ConnectionImpl(final Connection connection) { @@ -206,8 +211,10 @@ return connection.bindAsync(request, intermediateResponseHandler, timestamper( resultHandler, true)); } else { // A heart beat must be in progress so create a runnable task // which will be executed when the heart beat completes. /* * A heart beat must be in progress so create a runnable task * which will be executed when the heart beat completes. */ final DelayedFuture<BindResult> future = new DelayedFuture<BindResult>(resultHandler) { @Override @@ -216,7 +223,10 @@ timestamper(this, true)); } }; // Enqueue and flush if the heart beat has completed in the mean time. /* * Enqueue and flush if the heart beat has completed in the mean * time. */ pendingRequests.offer(future); flushPendingRequests(); return future; @@ -342,8 +352,11 @@ return connection.extendedRequestAsync(request, intermediateResponseHandler, timestamper(resultHandler, true)); } else { // A heart beat must be in progress so create a runnable task // which will be executed when the heart beat completes. /* * A heart beat must be in progress so create a runnable * task which will be executed when the heart beat * completes. */ final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) { @Override public FutureResult<R> dispatch() { @@ -351,7 +364,11 @@ intermediateResponseHandler, timestamper(this, true)); } }; // Enqueue and flush if the heart beat has completed in the mean time. /* * Enqueue and flush if the heart beat has completed in the * mean time. */ pendingRequests.offer(future); flushPendingRequests(); return future; @@ -382,7 +399,7 @@ @Override public void handleErrorResult(final ErrorResultException error) { if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format("Heartbeat failed: " + error.getMessage())); DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage())); } updateTimestamp(); releaseHeartBeatLock(); @@ -582,8 +599,10 @@ } private void acquireBindOrStartTLSLock() throws ErrorResultException { // Wait for pending heartbeats and prevent new heartbeats from // being sent while the bind is in progress. /* * Wait for pending heartbeats and prevent new heartbeats from being * sent while the bind is in progress. */ try { if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) { // Give up - it looks like the connection is dead. @@ -597,8 +616,11 @@ private void flushPendingRequests() { if (!pendingRequests.isEmpty()) { // The pending requests will acquire the shared lock, but we take // it here anyway to ensure that pending requests do not get blocked. /* * The pending requests will acquire the shared lock, but we * take it here anyway to ensure that pending requests do not * get blocked. */ if (sync.tryLockShared()) { try { Runnable pendingRequest; @@ -617,7 +639,10 @@ connection.removeConnectionEventListener(this); activeConnections.remove(this); if (activeConnections.isEmpty()) { // This is the last active connection, so stop the heartbeat. /* * This is the last active connection, so stop the * heartbeat. */ heartBeatFuture.cancel(false); } } @@ -633,22 +658,33 @@ } private void sendHeartBeat() { // Only send the heartbeat if the connection has been idle for some time. /* * Only send the heartbeat if the connection has been idle for some * time. */ if (currentTimeMillis() < (timestamp + minDelayMS)) { return; } // Don't send a heart beat if there is already a heart beat, // bind, or startTLS in progress. Note that the bind/startTLS // response will update the timestamp as if it were a heart beat. /* * Don't send a heart beat if there is already a heart beat, bind, * or startTLS in progress. Note that the bind/startTLS response * will update the timestamp as if it were a heart beat. */ if (sync.tryLockExclusively()) { try { connection.searchAsync(heartBeatRequest, null, this); } catch (final Exception e) { // This may happen when we attempt to send the heart beat just // after the connection is closed but before we are notified. /* * This may happen when we attempt to send the heart beat * just after the connection is closed but before we are * notified. */ // Release the lock because we're never going to get a response. /* * Release the lock because we're never going to get a * response. */ releaseHeartBeatLock(); } } @@ -755,7 +791,7 @@ * </ul> */ private static final class Sync extends AbstractQueuedSynchronizer { // Lock states. Positive values indicate that the shared lock is taken. /* Lock states. Positive values indicate that the shared lock is taken. */ private static final int UNLOCKED = 0; // initial state private static final int LOCKED_EXCLUSIVELY = -1; @@ -809,8 +845,10 @@ } final int newState = state - 1; if (compareAndSetState(state, newState)) { // We could always return true here, but since there cannot // be waiting readers we can specialize for waiting writers. /* * We could always return true here, but since there cannot * be waiting readers we can specialize for waiting writers. */ return newState == UNLOCKED; } } @@ -851,83 +889,29 @@ private final SearchRequest heartBeatRequest; private final long interval; private final long minDelayMS; private final ScheduledExecutorService scheduler; private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; private final long timeoutMS; private final TimeUnit unit; private AtomicBoolean isClosed = new AtomicBoolean(); /** * Creates a new heart-beat connection factory which will create connections * using the provided connection factory and periodically ping any created * connections in order to detect that they are still alive every 10 seconds * using the default scheduler. * * @param factory * The connection factory to use for creating connections. */ HeartBeatConnectionFactory(final ConnectionFactory factory) { this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler()); this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null); } /** * Creates a new heart-beat connection factory which will create connections * using the provided connection factory and periodically ping any created * connections in order to detect that they are still alive using the * specified frequency and the default scheduler. * * @param factory * The connection factory to use for creating connections. * @param interval * The interval between keepalive pings. * @param unit * The time unit for the interval between keepalive pings. */ HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit) { this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler()); this(factory, interval, unit, DEFAULT_SEARCH, null); } /** * Creates a new heart-beat connection factory which will create connections * using the provided connection factory and periodically ping any created * connections using the specified search request in order to detect that * they are still alive. * * @param factory * The connection factory to use for creating connections. * @param interval * The interval between keepalive pings. * @param unit * The time unit for the interval between keepalive pings. * @param heartBeat * The search request to use for keepalive pings. */ HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit, final SearchRequest heartBeat) { this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler()); this(factory, interval, unit, heartBeat, null); } /** * Creates a new heart-beat connection factory which will create connections * using the provided connection factory and periodically ping any created * connections using the specified search request in order to detect that * they are still alive. * * @param factory * The connection factory to use for creating connections. * @param interval * The interval between keepalive pings. * @param unit * The time unit for the interval between keepalive pings. * @param heartBeat * The search request to use for keepalive pings. * @param scheduler * The scheduler which should for periodically sending keepalive * pings. */ HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit, final SearchRequest heartBeat, final ScheduledExecutorService scheduler) { Validator.ensureNotNull(factory, heartBeat, unit, scheduler); Validator.ensureNotNull(factory, heartBeat, unit); Validator.ensureTrue(interval >= 0, "negative timeout"); this.heartBeatRequest = heartBeat; @@ -935,22 +919,34 @@ this.unit = unit; this.activeConnections = new LinkedList<ConnectionImpl>(); this.factory = factory; this.scheduler = scheduler; this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler); this.timeoutMS = unit.toMillis(interval) * 2; this.minDelayMS = unit.toMillis(interval) / 2; } /** * {@inheritDoc} */ @Override public void close() { if (isClosed.compareAndSet(false, true)) { synchronized (activeConnections) { if (!activeConnections.isEmpty()) { if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format( "HeartbeatConnectionFactory '%s' is closing while %d " + "active connections remain", toString(), activeConnections.size())); } } } scheduler.release(); factory.close(); } } @Override public Connection getConnection() throws ErrorResultException { return adaptConnection(factory.getConnection()); } /** * {@inheritDoc} */ @Override public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> handler) { @@ -967,9 +963,6 @@ return future; } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); @@ -984,8 +977,8 @@ synchronized (activeConnections) { connection.addConnectionEventListener(heartBeatConnection); if (activeConnections.isEmpty()) { // This is the first active connection, so start the heart beat. heartBeatFuture = scheduler.scheduleWithFixedDelay(new Runnable() { /* This is the first active connection, so start the heart beat. */ heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() { @Override public void run() { final ConnectionImpl[] tmp; opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/InternalConnectionFactory.java
@@ -22,7 +22,7 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS. * Portions copyright 2011-2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; @@ -50,9 +50,7 @@ * The type of client context. */ final class InternalConnectionFactory<C> implements ConnectionFactory { private final ServerConnectionFactory<C, Integer> factory; private final C clientContext; InternalConnectionFactory(final ServerConnectionFactory<C, Integer> factory, @@ -61,17 +59,16 @@ this.clientContext = clientContext; } /** * {@inheritDoc} */ @Override public void close() { // Nothing to do. } public Connection getConnection() throws ErrorResultException { final ServerConnection<Integer> serverConnection = factory.handleAccept(clientContext); return new InternalConnection(serverConnection); } /** * {@inheritDoc} */ public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> handler) { final ServerConnection<Integer> serverConnection; @@ -91,9 +88,6 @@ return new CompletedFutureResult<Connection>(connection); } /** * {@inheritDoc} */ public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("InternalConnectionFactory("); opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
@@ -22,7 +22,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS. * Portions copyright 2011-2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; @@ -39,9 +39,10 @@ * Server. */ public final class LDAPConnectionFactory implements ConnectionFactory { // We implement the factory using the pimpl idiom in order to avoid making // too many implementation classes public. /* * We implement the factory using the pimpl idiom in order to avoid making * too many implementation classes public. */ private final LDAPConnectionFactoryImpl impl; /** @@ -125,18 +126,17 @@ } } /** * {@inheritDoc} */ @Override public void close() { impl.close(); } @Override public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> handler) { return impl.getConnectionAsync(handler); } /** * {@inheritDoc} */ @Override public Connection getConnection() throws ErrorResultException { return impl.getConnection(); @@ -183,9 +183,6 @@ return impl.getSocketAddress(); } /** * {@inheritDoc} */ @Override public String toString() { return impl.toString(); opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
@@ -22,7 +22,7 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS. * Portions copyright 2011-2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; @@ -37,28 +37,22 @@ final class LoadBalancer implements ConnectionFactory { private final LoadBalancingAlgorithm algorithm; /** * Creates a new load balancer using the provided algorithm. * * @param algorithm * The load balancing algorithm which will be used to obtain the * next connection factory. */ public LoadBalancer(final LoadBalancingAlgorithm algorithm) { LoadBalancer(final LoadBalancingAlgorithm algorithm) { Validator.ensureNotNull(algorithm); this.algorithm = algorithm; } /** * {@inheritDoc} */ @Override public void close() { // Delegate to the algorithm. algorithm.close(); } @Override public Connection getConnection() throws ErrorResultException { return algorithm.getConnectionFactory().getConnection(); } /** * {@inheritDoc} */ @Override public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> resultHandler) { @@ -76,9 +70,7 @@ return factory.getConnectionAsync(resultHandler); } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("LoadBalancer("); opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java
@@ -22,17 +22,28 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; import java.io.Closeable; /** * A load balancing algorithm distributes connection requests across one or more * underlying connection factories in an implementation defined manner. * * @see Connections#newLoadBalancer(LoadBalancingAlgorithm) newLoadBalancer */ public interface LoadBalancingAlgorithm { public interface LoadBalancingAlgorithm extends Closeable { /** * Releases any resources associated with this algorithm, including any * associated connection factories. */ @Override public void close(); /** * Returns a connection factory which should be used in order to satisfy the * next connection request. opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/DefaultTCPNIOTransportTestCase.java
@@ -22,11 +22,12 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2012 ForgeRock AS. * Portions copyright 2012-2013 ForgeRock AS. */ package com.forgerock.opendj.ldap; import static com.forgerock.opendj.ldap.DefaultTCPNIOTransport.DEFAULT_TRANSPORT; import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress; import static org.testng.Assert.assertTrue; @@ -36,6 +37,8 @@ import org.glassfish.grizzly.nio.transport.TCPNIOTransport; import org.testng.annotations.Test; import com.forgerock.opendj.util.ReferenceCountedObject; /** * Tests DefaultTCPNIOTransport class. */ @@ -52,9 +55,10 @@ @Test(enabled = false) public void testGetInstance() throws Exception { // Create a transport. final TCPNIOTransport transport = DefaultTCPNIOTransport.getInstance(); final ReferenceCountedObject<TCPNIOTransport>.Reference transport = DEFAULT_TRANSPORT.acquire(); SocketAddress socketAddress = findFreeSocketAddress(); transport.bind(socketAddress); transport.get().bind(socketAddress); // Establish a socket connection to see if the transport factory works. final Socket socket = new Socket(); @@ -66,6 +70,7 @@ // Don't stop the transport because it is shared with the ldap server. } finally { socket.close(); transport.release(); } } } opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/ReferenceCountedObjectTestCase.java
New file @@ -0,0 +1,151 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions copyright 2013 ForgeRock AS. */ package com.forgerock.opendj.util; import static org.fest.assertions.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import org.testng.annotations.Test; /** * This Test Class tests {@link ReferenceCountedObject}. */ @SuppressWarnings("javadoc") public class ReferenceCountedObjectTestCase extends UtilTestCase { private interface Impl { void destroyInstance(Object instance); Object newInstance(); } private final Object object = "Test Object"; @Test public void testAcquire() throws Exception { final Impl impl = mock(Impl.class); when(impl.newInstance()).thenReturn(object); final ReferenceCountedObject<Object> rco = rco(impl); // First acquisition should create new instance. final ReferenceCountedObject<Object>.Reference ref1 = rco.acquire(); assertThat(ref1.get()).isSameAs(object); verify(impl).newInstance(); verifyNoMoreInteractions(impl); // Second acquisition should just bump the ref count. final ReferenceCountedObject<Object>.Reference ref2 = rco.acquire(); assertThat(ref2.get()).isSameAs(object); verifyNoMoreInteractions(impl); // First dereference should just decrease the ref count. ref1.release(); verifyNoMoreInteractions(impl); // Second dereference should destroy the instance. ref2.release(); verify(impl).destroyInstance(object); verifyNoMoreInteractions(impl); } @Test public void testAcquireIfNull() throws Exception { final Object otherObject = "Other object"; final Impl impl = mock(Impl.class); when(impl.newInstance()).thenReturn(object); final ReferenceCountedObject<Object> rco = rco(impl); final ReferenceCountedObject<Object>.Reference ref = rco.acquireIfNull(otherObject); verify(impl, never()).newInstance(); assertThat(ref.get()).isSameAs(otherObject); ref.release(); verifyNoMoreInteractions(impl); } /** * This test attempts to test that finalization works. It loops at most 100 * times performing GCs and checking to see if the finalizer was called. * Usually objects are finalized after 2 GCs, so the loop should complete * quite quickly. * * @throws Exception * If an unexpected error occurred. */ @Test public void testFinalization() throws Exception { final Impl impl = mock(Impl.class); when(impl.newInstance()).thenReturn(object); final ReferenceCountedObject<Object> rco = rco(impl); ReferenceCountedObject<Object>.Reference ref = rco.acquire(); System.gc(); System.gc(); verify(impl, never()).destroyInstance(object); // Read in order to prevent optimization. if (ref != null) { ref = null; } for (int i = 0; i < 100; i++) { System.gc(); try { verify(impl).destroyInstance(object); break; // Finalized so stop. } catch (final Throwable t) { // Retry. } } verify(impl).destroyInstance(object); } @Test(expectedExceptions = NullPointerException.class) public void testStaleReference() throws Exception { final Impl impl = mock(Impl.class); when(impl.newInstance()).thenReturn(object); final ReferenceCountedObject<Object> rco = rco(impl); final ReferenceCountedObject<Object>.Reference ref = rco.acquire(); ref.release(); ref.get(); } private ReferenceCountedObject<Object> rco(final Impl impl) { return new ReferenceCountedObject<Object>() { @Override protected void destroyInstance(final Object instance) { impl.destroyInstance(instance); } @Override protected Object newInstance() { return impl.newInstance(); } }; } } opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthenticatedConnectionFactory.java
@@ -67,10 +67,9 @@ * An authenticated connection supports all operations except Bind * operations. */ static final class AuthenticatedConnection extends AbstractConnectionWrapper { static final class AuthenticatedConnection extends AbstractConnectionWrapper<Connection> { private final BindRequest request; private volatile BindResult result; private AuthenticatedConnection(final Connection connection, final BindRequest request, @@ -80,28 +79,19 @@ this.result = result; } /** /* * Bind operations are not supported by pre-authenticated connections. * This method will always throw {@code UnsupportedOperationException}. * These methods will always throw {@code UnsupportedOperationException}. */ /** * {@inheritDoc} */ public BindResult bind(BindRequest request) throws ErrorResultException { throw new UnsupportedOperationException(); } /** * {@inheritDoc} */ public BindResult bind(String name, char[] password) throws ErrorResultException { throw new UnsupportedOperationException(); } /** * {@inheritDoc} */ public FutureResult<BindResult> bindAsync(BindRequest request, IntermediateResponseHandler intermediateResponseHandler, ResultHandler<? super BindResult> resultHandler) { @@ -140,15 +130,19 @@ throw new UnsupportedOperationException(); } // Wrap the client handler so that we can update the connection // state. /* * Wrap the client handler so that we can update the connection * state. */ final ResultHandler<? super BindResult> clientHandler = handler; final ResultHandler<BindResult> handlerWrapper = new ResultHandler<BindResult>() { public void handleErrorResult(final ErrorResultException error) { // This connection is now unauthenticated so prevent // further use. /* * This connection is now unauthenticated so prevent further * use. */ connection.close(); if (clientHandler != null) { @@ -170,9 +164,6 @@ return connection.bindAsync(request, null, handlerWrapper); } /** * {@inheritDoc} */ public String toString() { StringBuilder builder = new StringBuilder(); builder.append("AuthenticatedConnection("); @@ -255,9 +246,11 @@ this.request = request; } /** * {@inheritDoc} */ @Override public void close() { parentFactory.close(); } public Connection getConnection() throws ErrorResultException { final Connection connection = parentFactory.getConnection(); BindResult bindResult = null; @@ -268,14 +261,14 @@ connection.close(); } } // If the bind didn't succeed then an exception will have been thrown // and this line will not be reached. /* * If the bind didn't succeed then an exception will have been thrown * and this line will not be reached. */ return new AuthenticatedConnection(connection, request, bindResult); } /** * {@inheritDoc} */ public FutureResult<Connection> getConnectionAsync( final ResultHandler<? super Connection> handler) { final FutureResultImpl future = new FutureResultImpl(request, handler); @@ -317,9 +310,6 @@ return this; } /** * {@inheritDoc} */ public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("AuthenticatedConnectionFactory("); opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPAuthnFilter.java
@@ -104,17 +104,16 @@ private boolean supportHTTPBasicAuthentication = true; private ServletApiVersionAdapter syncFactory; /** * {@inheritDoc} */ @Override public void destroy() { // TODO: We should release any resources maintained by the filter, such as connection pools. if (searchLDAPConnectionFactory != null) { searchLDAPConnectionFactory.close(); } if (bindLDAPConnectionFactory != null) { bindLDAPConnectionFactory.close(); } } /** * {@inheritDoc} */ @Override public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) throws IOException, ServletException { @@ -295,9 +294,6 @@ } } /** * {@inheritDoc} */ @Override public void init(final FilterConfig config) throws ServletException { // FIXME: make it possible to configure the filter externally, especially opendj3/opendj-rest2ldap-servlet/src/main/java/org/forgerock/opendj/rest2ldap/servlet/Rest2LDAPConnectionFactoryProvider.java
@@ -15,6 +15,9 @@ */ package org.forgerock.opendj.rest2ldap.servlet; import static org.forgerock.json.resource.Resources.newInternalConnectionFactory; import static org.forgerock.opendj.rest2ldap.Rest2LDAP.configureConnectionFactory; import java.io.InputStream; import java.util.Map; @@ -25,8 +28,11 @@ import org.codehaus.jackson.map.ObjectMapper; import org.forgerock.json.fluent.JsonValue; import org.forgerock.json.resource.CollectionResourceProvider; import org.forgerock.json.resource.Connection; import org.forgerock.json.resource.ConnectionFactory; import org.forgerock.json.resource.Resources; import org.forgerock.json.resource.FutureResult; import org.forgerock.json.resource.ResourceException; import org.forgerock.json.resource.ResultHandler; import org.forgerock.json.resource.Router; import org.forgerock.opendj.rest2ldap.AuthorizationPolicy; import org.forgerock.opendj.rest2ldap.Rest2LDAP; @@ -90,8 +96,8 @@ final org.forgerock.opendj.ldap.ConnectionFactory ldapFactory; if (ldapFactoryName != null) { ldapFactory = Rest2LDAP.configureConnectionFactory(configuration.get( "ldapConnectionFactories").required(), ldapFactoryName); configureConnectionFactory(configuration.get("ldapConnectionFactories") .required(), ldapFactoryName); } else { ldapFactory = null; } @@ -107,7 +113,33 @@ .configureMapping(mapping).build(); router.addRoute(mappingUrl, provider); } return Resources.newInternalConnectionFactory(router); final ConnectionFactory factory = newInternalConnectionFactory(router); if (ldapFactory != null) { /* * Return a wrapper which will release resources associated with * the LDAP connection factory (pooled connections, transport, * etc). */ return new ConnectionFactory() { @Override public FutureResult<Connection> getConnectionAsync( ResultHandler<Connection> handler) { return factory.getConnectionAsync(handler); } @Override public Connection getConnection() throws ResourceException { return factory.getConnection(); } @Override public void close() { ldapFactory.close(); } }; } else { return factory; } } catch (final ServletException e) { // Rethrow. throw e; opendj3/opendj-server2x-adapter/src/main/java/org/forgerock/opendj/adapter/server2x/Adapters.java
@@ -145,6 +145,11 @@ ConnectionFactory factory = new ConnectionFactory() { @Override public void close() { // Nothing to do. } @Override public FutureResult<Connection> getConnectionAsync( ResultHandler<? super Connection> handler) { if (handler != null) {