/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Fail.fail; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import static org.forgerock.opendj.ldap.SearchScope.BASE_OBJECT; import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnection; import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory; import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource; import static org.forgerock.opendj.ldap.requests.Requests.newModifyRequest; import static org.forgerock.opendj.ldap.requests.Requests.newSearchRequest; import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest; import static org.forgerock.opendj.ldap.responses.Responses.newResult; import static org.mockito.Matchers.any; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.forgerock.opendj.ldap.requests.BindRequest; import org.forgerock.opendj.ldap.requests.SearchRequest; import org.forgerock.opendj.ldap.responses.Result; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.forgerock.opendj.util.CompletedFutureResult; import com.forgerock.opendj.util.StaticUtils; /** * Tests the connection pool implementation.. */ @SuppressWarnings("javadoc") public class HeartBeatConnectionFactoryTestCase extends SdkTestCase { // @formatter:off /* * Key features which need testing: * * - lazy scheduler registration and deregistration * - scheduled task only sends heart-beat when connection is open * - connection remains valid when any response is received * - connection remains valid when a heart beat response is received * - connection becomes invalid if no response is received during timeout * - heart beat only sent when connection is idle * - slow bind / startTLS prevents heart beat * - slow heart beat prevents bind / start TLS * - support concurrent bind / start TLS */ // @formatter:on private static final SearchRequest HEARTBEAT = newSearchRequest("dc=test", BASE_OBJECT, "(objectclass=*)", "1.1"); private Connection connection; private ConnectionFactory factory; private Connection hbc; private HeartBeatConnectionFactory hbcf; private List listeners; private MockScheduler scheduler; /** * Disables logging before the tests. */ @BeforeClass() public void disableLogging() { StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE); } /** * Re-enable logging after the tests. */ @AfterClass() public void enableLogging() { StaticUtils.DEBUG_LOG.setLevel(Level.INFO); } @AfterMethod(alwaysRun = true) public void tearDown() { try { if (hbc != null) { hbc.close(); assertThat(hbc.isValid()).isFalse(); assertThat(hbc.isClosed()).isTrue(); } scheduler.runAllTasks(); // Flush any remaining timeout tasks. assertThat(scheduler.isScheduled()).isFalse(); // No more connections to check. hbcf.close(); } finally { connection = null; factory = null; hbcf = null; listeners = null; scheduler = null; hbc = null; } } @SuppressWarnings("unchecked") @Test public void testBindWhileHeartBeatInProgress() throws Exception { // Mock connection with successful heartbeat. init(ResultCode.SUCCESS); // Get a connection. hbc = hbcf.getConnection(); /* * Send a heartbeat, trapping the search call-back so that we can send * the response once we have attempted a bind. */ when( connection.searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class), any(SearchResultHandler.class))) .thenReturn(null); when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L); scheduler.runAllTasks(); // Send the heartbeat. // Capture the heartbeat search result handler. final ArgumentCaptor arg = ArgumentCaptor.forClass(SearchResultHandler.class); verify(connection, times(2)).searchAsync(same(HEARTBEAT), any(IntermediateResponseHandler.class), arg.capture()); assertThat(hbc.isValid()).isTrue(); // Not checked yet. /* * Now attempt a bind request, which should be held in a queue until the * heart beat completes. */ hbc.bindAsync(newSimpleBindRequest(), null, null); verify(connection, times(0)).bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), any(ResultHandler.class)); // Send fake heartbeat response, releasing the bind request. arg.getValue().handleResult(newResult(ResultCode.SUCCESS)); verify(connection, times(1)).bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), any(ResultHandler.class)); } @Test public void testGetConnection() throws Exception { // Mock connection with successful initial heartbeat. init(ResultCode.SUCCESS); hbc = hbcf.getConnection(); assertThat(hbc).isNotNull(); assertThat(hbc.isValid()).isTrue(); } @Test public void testGetConnectionAsync() throws Exception { // Mock connection with successful initial heartbeat. init(ResultCode.SUCCESS); hbc = hbcf.getConnectionAsync(null).get(); assertThat(hbc).isNotNull(); assertThat(hbc.isValid()).isTrue(); } @Test public void testGetConnectionAsyncWithInitialHeartBeatError() throws Exception { // Mock connection with failing initial heartbeat. init(ResultCode.BUSY); try { hbcf.getConnectionAsync(null).get(); fail("Unexpectedly obtained a connection"); } catch (final ErrorResultException e) { checkInitialHeartBeatFailure(e); } } @Test public void testGetConnectionWithInitialHeartBeatError() throws Exception { // Mock connection with failing initial heartbeat. init(ResultCode.BUSY); try { hbcf.getConnection(); fail("Unexpectedly obtained a connection"); } catch (final ErrorResultException e) { checkInitialHeartBeatFailure(e); } } @Test public void testHeartBeatSucceedsThenFails() throws Exception { // Mock connection with successful heartbeat. init(ResultCode.SUCCESS); // Get a connection and check that it was pinged. hbc = hbcf.getConnection(); verifyHeartBeatSent(connection, 1); assertThat(scheduler.isScheduled()).isTrue(); // heartbeater assertThat(listeners).hasSize(1); assertThat(hbc.isValid()).isTrue(); // Invoke heartbeat before the connection is considered idle. scheduler.runAllTasks(); verifyHeartBeatSent(connection, 1); // No heartbeat sent - not idle yet. assertThat(hbc.isValid()).isTrue(); // Invoke heartbeat after the connection is considered idle. when(hbcf.timeSource.currentTimeMillis()).thenReturn(6000L); scheduler.runAllTasks(); verifyHeartBeatSent(connection, 2); // Heartbeat sent. assertThat(hbc.isValid()).isTrue(); // Now force the heartbeat to fail. mockHeartBeatResponse(connection, listeners, ResultCode.CLIENT_SIDE_SERVER_DOWN); when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L); scheduler.runAllTasks(); verifyHeartBeatSent(connection, 3); assertThat(hbc.isValid()).isFalse(); // Flush redundant timeout tasks. scheduler.runAllTasks(); // Attempt to send a new request: it should fail immediately. @SuppressWarnings("unchecked") final ResultHandler mockHandler = mock(ResultHandler.class); hbc.modifyAsync(newModifyRequest(DN.rootDN()), null, mockHandler); final ArgumentCaptor arg = ArgumentCaptor.forClass(ErrorResultException.class); verify(mockHandler).handleErrorResult(arg.capture()); assertThat(arg.getValue().getResult().getResultCode()).isEqualTo( ResultCode.CLIENT_SIDE_SERVER_DOWN); assertThat(hbc.isValid()).isFalse(); assertThat(hbc.isClosed()).isFalse(); } @Test public void testHeartBeatTimeout() throws Exception { // Mock connection with successful heartbeat. init(ResultCode.SUCCESS); // Get a connection and check that it was pinged. hbc = hbcf.getConnection(); // Now force the heartbeat to fail due to timeout. mockHeartBeatResponse(connection, listeners, null /* no response */); when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L); scheduler.runAllTasks(); // Send the heartbeat. verifyHeartBeatSent(connection, 2); assertThat(hbc.isValid()).isTrue(); // Not checked yet. when(hbcf.timeSource.currentTimeMillis()).thenReturn(12000L); scheduler.runAllTasks(); // Check for heartbeat. assertThat(hbc.isValid()).isFalse(); // Now invalid. assertThat(hbc.isClosed()).isFalse(); } @SuppressWarnings("unchecked") @Test public void testHeartBeatWhileBindInProgress() throws Exception { // Mock connection with successful heartbeat. init(ResultCode.SUCCESS); // Get a connection. hbc = hbcf.getConnection(); /* * Send a bind request, trapping the bind call-back so that we can send * the response once we have attempted a heartbeat. */ when( connection.bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), any(ResultHandler.class))) .thenReturn(null); hbc.bindAsync(newSimpleBindRequest(), null, null); // Capture the bind result handler. @SuppressWarnings("rawtypes") final ArgumentCaptor arg = ArgumentCaptor.forClass(ResultHandler.class); verify(connection, times(1)).bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), arg.capture()); /* * Now attempt the heartbeat which should not happen because there is a * bind in progress. */ when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L); scheduler.runAllTasks(); // Attempt to send the heartbeat. verify(connection, times(1)).searchAsync(same(HEARTBEAT), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); // Send fake bind response, releasing the heartbeat. arg.getValue().handleResult(newResult(ResultCode.SUCCESS)); // Attempt to send a heartbeat again. when(hbcf.timeSource.currentTimeMillis()).thenReturn(16000L); scheduler.runAllTasks(); // Attempt to send the heartbeat. verify(connection, times(2)).searchAsync(same(HEARTBEAT), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); } @Test public void testToString() { init(ResultCode.SUCCESS); assertThat(hbcf.toString()).isNotNull(); } private void checkInitialHeartBeatFailure(final ErrorResultException e) { /* * Initial heartbeat failure should trigger connection exception with * heartbeat cause. */ assertThat(e).isInstanceOf(ConnectionException.class); assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_SERVER_DOWN); assertThat(e.getCause()).isInstanceOf(ErrorResultException.class); assertThat(((ErrorResultException) e.getCause()).getResult().getResultCode()).isEqualTo( ResultCode.BUSY); } private void init(final ResultCode initialHeartBeatResult) { // Mock connection with successful heartbeat. listeners = new LinkedList(); connection = mockConnection(listeners); when(connection.isValid()).thenReturn(true); mockHeartBeatResponse(connection, listeners, initialHeartBeatResult); // Underlying connection factory. factory = mockConnectionFactory(connection); // Create heart beat connection factory. scheduler = new MockScheduler(); hbcf = new HeartBeatConnectionFactory(factory, 10000, 100, TimeUnit.MILLISECONDS, HEARTBEAT, scheduler); // Set initial time stamp. hbcf.timeSource = mockTimeSource(0); } private Connection mockHeartBeatResponse(final Connection mockConnection, final List listeners, final ResultCode resultCode) { doAnswer(new Answer>() { @Override public FutureResult answer(final InvocationOnMock invocation) throws Throwable { if (resultCode == null) { return null; } final SearchResultHandler handler = (SearchResultHandler) invocation.getArguments()[2]; if (resultCode.isExceptional()) { final ErrorResultException error = newErrorResult(resultCode); if (handler != null) { handler.handleErrorResult(error); } if (error instanceof ConnectionException) { for (final ConnectionEventListener listener : listeners) { listener.handleConnectionError(false, error); } } return new CompletedFutureResult(error); } else { final Result result = newResult(resultCode); if (handler != null) { handler.handleResult(result); } return new CompletedFutureResult(result); } } }).when(mockConnection).searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); return mockConnection; } private void verifyHeartBeatSent(final Connection connection, final int times) { verify(connection, times(times)).searchAsync(same(HEARTBEAT), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); } }