From 7274fbd5fac971154d4c3307b3cda47843f6d02c Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 08 Jul 2013 17:23:49 +0000
Subject: [PATCH] Unit tests for OPENDJ-1058: HeartbeatConnectionFactory does not actively shutdown dead connections
---
opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java | 426 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 426 insertions(+), 0 deletions(-)
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
new file mode 100644
index 0000000..ab5a5a5
--- /dev/null
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
@@ -0,0 +1,426 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS.
+ */
+
+package org.forgerock.opendj.ldap;
+
+import static org.fest.assertions.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.Connections.newHeartBeatConnectionFactory;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+import static org.forgerock.opendj.ldap.SearchScope.BASE_OBJECT;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnection;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory;
+import static org.forgerock.opendj.ldap.requests.Requests.newModifyRequest;
+import static org.forgerock.opendj.ldap.requests.Requests.newSearchRequest;
+import static org.forgerock.opendj.ldap.responses.Responses.newResult;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.Test;
+
+/**
+ * Tests the connection pool implementation..
+ */
+@SuppressWarnings("javadoc")
+public class HeartBeatConnectionFactoryTestCase extends SdkTestCase {
+
+ // @formatter:off
+
+ /*
+ * Key features which need testing:
+ *
+ * - lazy scheduler registration and deregistration
+ * - scheduled task only sends heart-beat when connection is open
+ * - connection remains valid when any response is received
+ * - connection remains valid when a heart beat response is received
+ * - connection becomes invalid if no response is received during timeout
+ * - heart beat only sent when connection is idle
+ * - slow bind / startTLS prevents heart beat
+ * - slow heart beat prevents bind / start TLS
+ * - support concurrent bind / start TLS
+ */
+
+ // @formatter:on
+
+ private static final class MockScheduler implements ScheduledExecutorService {
+
+ private Runnable command;
+
+ private long delay;
+
+ private boolean isScheduled = false;
+
+ private TimeUnit unit;
+
+ @Override
+ public boolean awaitTermination(final long timeout, final TimeUnit unit)
+ throws InterruptedException {
+ // Unused.
+ return false;
+ }
+
+ @Override
+ public void execute(final Runnable command) {
+ // Unused.
+
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks,
+ final long timeout, final TimeUnit unit) throws InterruptedException {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout,
+ final TimeUnit unit) throws InterruptedException, ExecutionException,
+ TimeoutException {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ // Unused.
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ // Unused.
+ return false;
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay,
+ final TimeUnit unit) {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, final long delay,
+ final TimeUnit unit) {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
+ final long initialDelay, final long period, final TimeUnit unit) {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command,
+ final long initialDelay, final long delay, final TimeUnit unit) {
+ this.command = command;
+ this.delay = delay;
+ this.unit = unit;
+ this.isScheduled = true;
+ return new ScheduledFuture<Object>() {
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ isScheduled = false;
+ return true;
+ }
+
+ @Override
+ public int compareTo(final Delayed o) {
+ // Unused.
+ return 0;
+ }
+
+ @Override
+ public Object get() throws InterruptedException, ExecutionException {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public Object get(final long timeout, final TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public long getDelay(final TimeUnit unit) {
+ // Unused.
+ return 0;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return !isScheduled;
+ }
+
+ @Override
+ public boolean isDone() {
+ // Unused.
+ return false;
+ }
+
+ };
+ }
+
+ @Override
+ public void shutdown() {
+ // Unused.
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ // Unused.
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <T> Future<T> submit(final Callable<T> task) {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public Future<?> submit(final Runnable task) {
+ // Unused.
+ return null;
+ }
+
+ @Override
+ public <T> Future<T> submit(final Runnable task, final T result) {
+ // Unused.
+ return null;
+ }
+ }
+
+ @Test(enabled = false)
+ public void testHeartBeatTimeout() throws Exception {
+ // Mock connection which never responds to heartbeat.
+ final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>();
+ final Connection connection = mockConnection(listeners);
+ when(connection.isValid()).thenReturn(true);
+
+ // Underlying connection factory.
+ final ConnectionFactory factory = mockConnectionFactory(connection);
+
+ // Create heart beat connection factory.
+ final SearchRequest hb = newSearchRequest("dc=test", BASE_OBJECT, "(objectclass=*)", "1.1");
+ final MockScheduler scheduler = new MockScheduler();
+ final ConnectionFactory hbcf =
+ newHeartBeatConnectionFactory(factory, 0, TimeUnit.MILLISECONDS, hb, scheduler);
+
+ // First connection should cause heart beat to be scheduled.
+ final Connection hbc = hbcf.getConnection();
+ assertThat(scheduler.isScheduled).isTrue();
+ assertThat(scheduler.command).isNotNull();
+ assertThat(scheduler.delay).isEqualTo(0);
+ assertThat(scheduler.unit).isEqualTo(TimeUnit.MILLISECONDS);
+ assertThat(listeners).hasSize(1);
+
+ // The connection should be immediately invalid due to 0 timeout.
+ assertThat(connection.isValid()).isTrue();
+ assertThat(hbc.isValid()).isFalse();
+
+ /*
+ * Attempt to send heartbeat. This should trigger the connection to be
+ * closed and all subsequent request attempts to fail.
+ */
+ scheduler.command.run();
+
+ // The underlying connection should have been closed.
+ verify(connection).close();
+
+ /*
+ * ...and the scheduled heart beat stopped because there are no
+ * remaining connections.
+ */
+ assertThat(scheduler.isScheduled).isFalse();
+
+ // Attempt to send a new request: it should fail immediately.
+ @SuppressWarnings("unchecked")
+ final ResultHandler<Result> mockHandler = mock(ResultHandler.class);
+ hbc.modifyAsync(newModifyRequest(DN.rootDN()), null, mockHandler);
+ final ArgumentCaptor<ErrorResultException> arg =
+ ArgumentCaptor.forClass(ErrorResultException.class);
+ verify(mockHandler).handleErrorResult(arg.capture());
+ assertThat(arg.getValue().getResult().getResultCode()).isEqualTo(
+ ResultCode.CLIENT_SIDE_SERVER_DOWN);
+ }
+
+ @Test
+ public void testSchedulerRegistration() throws Exception {
+ // Three mock connections.
+ final List<ConnectionEventListener> listeners1 = new LinkedList<ConnectionEventListener>();
+ final Connection connection1 = heartBeat(mockConnection(listeners1), ResultCode.SUCCESS);
+
+ final List<ConnectionEventListener> listeners2 = new LinkedList<ConnectionEventListener>();
+ final Connection connection2 = heartBeat(mockConnection(listeners2), ResultCode.SUCCESS);
+
+ final List<ConnectionEventListener> listeners3 = new LinkedList<ConnectionEventListener>();
+ final Connection connection3 = heartBeat(mockConnection(listeners3), ResultCode.SUCCESS);
+
+ // Underlying connection factory.
+ final ConnectionFactory factory =
+ mockConnectionFactory(connection1, connection2, connection3);
+
+ // Create heart beat connection factory.
+ final SearchRequest hb = newSearchRequest("dc=test", BASE_OBJECT, "(objectclass=*)", "1.1");
+ final MockScheduler scheduler = new MockScheduler();
+ final ConnectionFactory hbcf =
+ newHeartBeatConnectionFactory(factory, 0, TimeUnit.MILLISECONDS, hb, scheduler);
+
+ // Heart beat should not be scheduled yet.
+ assertThat(scheduler.isScheduled).isFalse();
+
+ // First connection should cause heart beat to be scheduled.
+ hbcf.getConnection();
+ assertThat(scheduler.isScheduled).isTrue();
+ assertThat(scheduler.command).isNotNull();
+ assertThat(scheduler.delay).isEqualTo(0);
+ assertThat(scheduler.unit).isEqualTo(TimeUnit.MILLISECONDS);
+ assertThat(listeners1).hasSize(1);
+
+ // Second connection should not change anything.
+ hbcf.getConnection();
+ assertThat(scheduler.isScheduled).isTrue();
+ assertThat(listeners2).hasSize(1);
+
+ // Check heart-beat sent to both connections.
+ scheduler.command.run();
+ verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ verify(connection2, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ verify(connection3, times(0)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+
+ // Close first connection: heart beat should still be scheduled.
+ listeners1.get(0).handleConnectionClosed();
+ assertThat(scheduler.isScheduled).isTrue();
+ assertThat(listeners1).isEmpty();
+
+ // Check heart-beat only sent to second connection.
+ scheduler.command.run();
+ verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ verify(connection2, times(2)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ verify(connection3, times(0)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+
+ // Close second connection: heart beat should now be stopped.
+ listeners2.get(0).handleConnectionClosed();
+ assertThat(scheduler.isScheduled).isFalse();
+ assertThat(listeners2).isEmpty();
+
+ // Opening another connection should restart the heart beat.
+ hbcf.getConnection();
+ assertThat(scheduler.isScheduled).isTrue();
+ assertThat(scheduler.command).isNotNull();
+ assertThat(scheduler.delay).isEqualTo(0);
+ assertThat(scheduler.unit).isEqualTo(TimeUnit.MILLISECONDS);
+ assertThat(listeners3).hasSize(1);
+
+ // Check heart-beat only sent to third connection.
+ scheduler.command.run();
+ verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ verify(connection2, times(2)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ verify(connection3, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ }
+
+ private Connection heartBeat(final Connection mockConnection, final ResultCode resultCode) {
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(final InvocationOnMock invocation) throws Throwable {
+ final SearchResultHandler handler =
+ (SearchResultHandler) invocation.getArguments()[2];
+ if (resultCode.isExceptional()) {
+ handler.handleErrorResult(newErrorResult(resultCode));
+ } else {
+ handler.handleResult(newResult(resultCode));
+ }
+ return null;
+ }
+ }).when(mockConnection).searchAsync(any(SearchRequest.class),
+ any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+ return mockConnection;
+ }
+
+ // private void sleepFor(final long ms) {
+ // // Avoid premature wake-ups.
+ // final long until = System.currentTimeMillis() + ms;
+ // do {
+ // try {
+ // Thread.sleep(until - System.currentTimeMillis());
+ // } catch (final InterruptedException e) {
+ // Thread.currentThread().interrupt();
+ // return;
+ // }
+ // } while (System.currentTimeMillis() < until);
+ // }
+}
--
Gitblit v1.10.0