/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. */ package org.opends.sdk; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.opends.sdk.requests.Requests; import org.opends.sdk.requests.SearchRequest; import org.opends.sdk.responses.ExtendedResult; import org.opends.sdk.responses.Result; import org.opends.sdk.responses.SearchResultEntry; import org.opends.sdk.responses.SearchResultReference; import com.sun.opends.sdk.util.AsynchronousConnectionDecorator; import com.sun.opends.sdk.util.FutureResultTransformer; import com.sun.opends.sdk.util.StaticUtils; import com.sun.opends.sdk.util.Validator; /** * An heart beat connection factory can be used to create connections that sends * a periodic search request to a Directory Server. */ final class HeartBeatConnectionFactory extends AbstractConnectionFactory { /** * An asynchronous connection that sends heart beats and supports all * operations. */ private final class AsynchronousConnectionImpl extends AsynchronousConnectionDecorator implements ConnectionEventListener, SearchResultHandler { private long lastSuccessfulPing; private FutureResult lastPingFuture; private AsynchronousConnectionImpl(final AsynchronousConnection connection) { super(connection); } @Override public void handleConnectionClosed() { notifyClosed(); } @Override public void handleConnectionError(final boolean isDisconnectNotification, final ErrorResultException error) { notifyClosed(); } /** * {@inheritDoc} */ @Override public boolean handleEntry(final SearchResultEntry entry) { // Ignore. return true; } /** * {@inheritDoc} */ @Override public void handleErrorResult(final ErrorResultException error) { connection.close(Requests.newUnbindRequest(), "Heartbeat retured error: " + error); } /** * {@inheritDoc} */ @Override public boolean handleReference(final SearchResultReference reference) { // Ignore. return true; } /** * {@inheritDoc} */ @Override public void handleResult(final Result result) { lastSuccessfulPing = System.currentTimeMillis(); } @Override public void handleUnsolicitedNotification(final ExtendedResult notification) { // Do nothing } /** * {@inheritDoc} */ @Override public boolean isValid() { return connection.isValid() && (lastSuccessfulPing <= 0 || System.currentTimeMillis() - lastSuccessfulPing < unit.toMillis(interval) * 2); } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("HeartBeatConnection("); builder.append(connection); builder.append(')'); return builder.toString(); } private void notifyClosed() { synchronized (activeConnections) { connection.removeConnectionEventListener(this); activeConnections.remove(this); if (activeConnections.isEmpty()) { // This is the last active connection, so stop the heart beat. heartBeatFuture.cancel(false); } } } } private final class FutureResultImpl extends FutureResultTransformer implements ResultHandler { private FutureResultImpl( final ResultHandler handler) { super(handler); } /** * {@inheritDoc} */ @Override protected AsynchronousConnection transformResult( final AsynchronousConnection connection) throws ErrorResultException { final AsynchronousConnectionImpl heartBeatConnection = new AsynchronousConnectionImpl( connection); synchronized (activeConnections) { connection.addConnectionEventListener(heartBeatConnection); if (activeConnections.isEmpty()) { // This is the first active connection, so start the heart beat. heartBeatFuture = scheduler.scheduleWithFixedDelay( new HeartBeatRunnable(), 0, interval, unit); } activeConnections.add(heartBeatConnection); } return heartBeatConnection; } } private final class HeartBeatRunnable implements Runnable { private HeartBeatRunnable() { // Nothing to do. } @Override public void run() { synchronized (activeConnections) { for (final AsynchronousConnectionImpl connection : activeConnections) { if (connection.lastPingFuture == null || connection.lastPingFuture.isDone()) { connection.lastPingFuture = connection.search(heartBeat, connection, null); } } } } } private final SearchRequest heartBeat; private final long interval; private final ScheduledExecutorService scheduler; private final TimeUnit unit; private final List activeConnections; private final ConnectionFactory factory; private static final SearchRequest DEFAULT_SEARCH = Requests .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1"); private ScheduledFuture heartBeatFuture; /** * Creates a new heart-beat connection factory which will create connections * using the provided connection factory and periodically ping any created * connections in order to detect that they are still alive every 10 seconds * using the default scheduler. * * @param factory * The connection factory to use for creating connections. */ HeartBeatConnectionFactory(final ConnectionFactory factory) { this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils .getDefaultScheduler()); } /** * Creates a new heart-beat connection factory which will create connections * using the provided connection factory and periodically ping any created * connections in order to detect that they are still alive using the * specified frequency and the default scheduler. * * @param factory * The connection factory to use for creating connections. * @param interval * The interval between keepalive pings. * @param unit * The time unit for the interval between keepalive pings. */ HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit) { this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils .getDefaultScheduler()); } /** * Creates a new heart-beat connection factory which will create connections * using the provided connection factory and periodically ping any created * connections using the specified search request in order to detect that they * are still alive. * * @param factory * The connection factory to use for creating connections. * @param interval * The interval between keepalive pings. * @param unit * The time unit for the interval between keepalive pings. * @param heartBeat * The search request to use for keepalive pings. */ HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit, final SearchRequest heartBeat) { this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler()); } /** * Creates a new heart-beat connection factory which will create connections * using the provided connection factory and periodically ping any created * connections using the specified search request in order to detect that they * are still alive. * * @param factory * The connection factory to use for creating connections. * @param interval * The interval between keepalive pings. * @param unit * The time unit for the interval between keepalive pings. * @param heartBeat * The search request to use for keepalive pings. * @param scheduler * The scheduler which should for periodically sending keepalive * pings. */ HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final TimeUnit unit, final SearchRequest heartBeat, final ScheduledExecutorService scheduler) { Validator.ensureNotNull(factory, heartBeat, unit, scheduler); Validator.ensureTrue(interval >= 0, "negative timeout"); this.heartBeat = heartBeat; this.interval = interval; this.unit = unit; this.activeConnections = new LinkedList(); this.factory = factory; this.scheduler = scheduler; } @Override public FutureResult getAsynchronousConnection( final ResultHandler handler) { final FutureResultImpl future = new FutureResultImpl(handler); future.setFutureResult(factory.getAsynchronousConnection(future)); return future; } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("HeartBeatConnectionFactory("); builder.append(String.valueOf(factory)); builder.append(')'); return builder.toString(); } }