/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.plugin; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.server.SynchronizationProviderCfg; import org.opends.server.api.SynchronizationProvider; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.CSNGenerator; import org.opends.server.replication.common.DSInfo; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.types.Attribute; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.testng.Assert.*; /** * Some tests to go through the DS state machine and validate we get the * expected status according to the actions we perform. */ @SuppressWarnings("javadoc") public class StateMachineTest extends ReplicationTestCase { private static final String EXAMPLE_DN = "dc=example,dc=com"; // Server id definitions private static DN EXAMPLE_DN_; private static final int DS1_ID = 1; private static final int DS2_ID = 2; private static final int DS3_ID = 3; private static final int RS1_ID = 41; private int rs1Port = -1; private LDAPReplicationDomain ds1; private ReplicationBroker ds2; private ReplicationBroker ds3; private ReplicationServer rs1; /** The tracer object for the debug logger */ private static final DebugTracer TRACER = getTracer(); private int initWindow = 100; private void debugInfo(String s) { logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); if (debugEnabled()) { TRACER.debugInfo("** TEST **" + s); } } private void initTest() throws IOException { rs1Port = -1; ds1 = null; ds2 = null; ds3 = null; rs1Port = TestCaseUtils.findFreePort(); } private void endTest() throws Exception { if (ds1 != null) { ds1.shutdown(); ds1 = null; } // Clear any reference to a domain in synchro plugin MultimasterReplication.deleteDomain(EXAMPLE_DN_); stop(ds2, ds3); ds2 = ds3 = null; remove(rs1); rs1 = null; rs1Port = -1; } /** * Check connection of the provided ds to the * replication server. Waits for connection to be ok up to secTimeout seconds * before failing. */ private void checkConnection(int secTimeout, int dsId) throws Exception { ReplicationBroker rb = null; LDAPReplicationDomain rd = null; switch (dsId) { case DS1_ID: rd = ds1; break; case DS2_ID: rb = ds2; break; case DS3_ID: rb = ds3; break; default: fail("Unknown ds server id."); } int nSec = 0; // Go out of the loop only if connection is verified or if timeout occurs while (true) { // Test connection boolean connected = false; if (rd != null) connected = rd.isConnected(); else connected = rb.isConnected(); if (connected) { // Connection verified debugInfo("checkConnection: connection of DS " + dsId + " to RS obtained after " + nSec + " seconds."); return; } Thread.sleep(100); nSec++; // Timeout reached, end with error assertFalse(nSec > secTimeout * 10, "checkConnection: DS " + dsId + " is not connected to the RS after " + secTimeout + " seconds."); } } /** * Creates a new ReplicationServer. */ private ReplicationServer createReplicationServer(String testCase, int degradedStatusThreshold) throws Exception { SortedSet replServers = new TreeSet(); String dir = "stateMachineTest" + RS1_ID + testCase + "Db"; ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(rs1Port, dir, 0, RS1_ID, 0, 100, replServers, 1, 1000, degradedStatusThreshold); return new ReplicationServer(conf); } /** * Creates and starts a new ReplicationDomain configured for the replication * server */ @SuppressWarnings("unchecked") private LDAPReplicationDomain createReplicationDomain(int dsId) throws Exception { SortedSet replServers = new TreeSet(); replServers.add("localhost:" + rs1Port); DomainFakeCfg domainConf = new DomainFakeCfg(EXAMPLE_DN_, dsId, replServers); LDAPReplicationDomain replicationDomain = MultimasterReplication.createNewDomain(domainConf); replicationDomain.start(); SynchronizationProvider provider = DirectoryServer.getSynchronizationProviders().get(0); if (provider instanceof ConfigurationChangeListener) { ConfigurationChangeListener mmr = (ConfigurationChangeListener) provider; mmr.applyConfigurationChange(new MultimasterReplicationFakeConf()); } return replicationDomain; } /** * Create and connect a replication broker to the replication server with the * given state and generation id */ private ReplicationBroker createReplicationBroker(int dsId, ServerState state, long generationId) throws Exception { SortedSet replServers = newSortedSet("localhost:" + rs1Port); DomainFakeCfg fakeCfg = new DomainFakeCfg(EXAMPLE_DN_, dsId, replServers); fakeCfg.setHeartbeatInterval(0); fakeCfg.setChangetimeHeartbeatInterval(500); ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true); ReplicationBroker broker = new ReplicationBroker( new DummyReplicationDomain(generationId), state, fakeCfg, generationId, security); broker.start(); checkConnection(30, broker, rs1Port); return broker; } /** * Make simple state machine test. * * NC = Not connected status * N = Normal status * D = Degraded status * FU = Full update status * BG = Bad generation id status * * The test path should be: * ->NC->N->NC * @throws Exception If a problem occurred */ @Test(enabled=true) public void testStateMachineBasic() throws Exception { String testCase = "testStateMachineBasic"; debugInfo("Starting " + testCase); initTest(); try { /** * DS1 start, no RS available: DS1 should be in not connected status */ ds1 = createReplicationDomain(DS1_ID); sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS); /** * RS1 starts , DS1 should connect to it and be in normal status */ rs1 = createReplicationServer(testCase, 5000); sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); /** * RS1 stops, DS1 should go in not connected status */ rs1.remove(); sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS); } finally { endTest(); } } /** Returns various init values for test testStateMachineStatusAnalyzer */ @DataProvider(name="stateMachineStatusAnalyzerTestProvider") public Object [][] stateMachineStatusAnalyzerTestProvider() throws Exception { return new Object [][] { {1} , {10}, {50}, {120} }; } /** * Test the status analyzer system that allows to go from normal to degraded * and vice versa, using the configured threshold value * * NC = Not connected status * N = Normal status * D = Degraded status * FU = Full update status * BG = Bad generation id status * * Expected path: * ->NC->N->D->N->NC * @throws Exception If a problem occurred */ @Test(enabled=true, groups="slow", dataProvider="stateMachineStatusAnalyzerTestProvider") public void testStateMachineStatusAnalyzer(int thresholdValue) throws Throwable { String testCase = "testStateMachineStatusAnalyzer with threhold " + thresholdValue; debugInfo("Starting " + testCase + " with " + thresholdValue); initTest(); BrokerReader br3 = null; BrokerReader br2 = null; BrokerWriter bw = null; try { /** * RS1 starts with specified threshold value */ rs1 = createReplicationServer(testCase, thresholdValue); /** * DS2 starts and connects to RS1. No reader and low window value at the * beginning so writer for DS2 in RS should enqueue changes after first * changes sent to DS. (window value reached: a window msg needed by RS for * following sending changes to DS) */ ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID); checkConnection(30, DS2_ID); /** * DS3 starts and connects to RS1 */ ds3 = createReplicationBroker(DS3_ID, new ServerState(), EMPTY_DN_GENID); br3 = new BrokerReader(ds3, DS3_ID); checkConnection(30, DS3_ID); // Send first changes to reach window and block DS2 writer queue. Writer will take them // from queue and block (no more changes removed from writer queue) after // having sent them to TCP receive queue of DS2. bw = new BrokerWriter(ds3, DS3_ID, false); bw.followAndPause(11); /** * DS3 sends changes (less than threshold): DS2 should still be in normal * status so no topo message should be sent (update topo message * for telling status of DS2 changed) */ int nChangesSent = 0; if (thresholdValue > 1) { nChangesSent = thresholdValue - 1; bw.followAndPause(nChangesSent); Thread.sleep(1000); // Be sure status analyzer has time to test ReplicationMsg msg = br3.getLastMsg(); debugInfo(testCase + " Step 1: last message from writer: " + msg); assertTrue(msg == null, (msg != null) ? msg.toString() : "null" ); } /** * DS3 sends changes to reach the threshold value, DS3 should receive an * update topo message with status of DS2: degraded status */ bw.followAndPause(thresholdValue - nChangesSent); // wait for a status MSG status analyzer to broker 3 waitForDegradedStatusOnBroker3(); /** * DS3 sends 10 additional changes after threshold value, DS2 should still be * degraded so no topo message received. */ bw.followAndPause(10); bw.shutdown(); Thread.sleep(1000); // Be sure status analyzer has time to test ReplicationMsg lastMsg = br3.getLastMsg(); ReplicationMsg msg = br3.getLastMsg(); debugInfo(testCase + " Step 3: last message from writer: " + msg); assertTrue(lastMsg == null); /** * DS2 replays every changes and should go back to normal status * (create a reader to emulate replay of messages (messages read from queue)) */ br2 = new BrokerReader(ds2, DS2_ID); // wait for a status MSG status analyzer to broker 3 waitForDegradedStatusOnBroker3(); } finally { endTest(); if (bw != null) bw.shutdown(); if (br3 != null) br3.shutdown(); if (br2 != null) br2.shutdown(); } } private void waitForDegradedStatusOnBroker3() throws InterruptedException { for (int count = 0; count< 50; count++) { List dsList = ds3.getDsList(); DSInfo ds3Info = null; if (dsList.size() > 0) { ds3Info = dsList.get(0); } if (ds3Info != null && ds3Info.getDsId() == DS2_ID && ds3Info.getStatus() == ServerStatus.DEGRADED_STATUS) { break; } assertTrue(count < 50, "DS2 did not get degraded : " + ds3Info); Thread.sleep(200); // Be sure status analyzer has time to test } } /** * Go through the possible state machine transitions: * * NC = Not connected status * N = Normal status * D = Degraded status * FU = Full update status * BG = Bad generation id status * * The test path should be: * ->NC->D->N->NC->N->D->NC->D->N->BG->NC->N->D->BG->FU->NC->N->D->FU->NC->BG->NC->N->FU->NC->N->NC * @throws Exception If a problem occurred */ @Test(enabled = false, groups = "slow") public void testStateMachineFull() throws Exception { String testCase = "testStateMachineFull"; debugInfo("Starting " + testCase); initTest(); BrokerReader br = null; BrokerWriter bw = null; try { int DEGRADED_STATUS_THRESHOLD = 1; // RS1 starts with 1 message as degraded status threshold value rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD); // DS2 starts and connects to RS1 ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID); br = new BrokerReader(ds2, DS2_ID); checkConnection(30, DS2_ID); // DS2 starts sending a lot of changes bw = new BrokerWriter(ds2, DS2_ID, false); bw.follow(); Thread.sleep(1000); // Let some messages being queued in RS /** * DS1 starts and connects to RS1, server state exchange should lead to * start in degraded status as some changes should be in queued in the RS * and the threshold value is 1 change in queue. */ ds1 = createReplicationDomain(DS1_ID); checkConnection(30, DS1_ID); sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS); /** * DS2 stops sending changes: DS1 should replay pending changes and should * enter the normal status */ bw.pause(); // Sleep enough so that replay can be done and analyzer has time // to see that the queue length is now under the threshold value. sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); /** * RS1 stops to make DS1 go to not connected status (from normal status) */ rs1.remove(); sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS); /** * DS2 restarts with up to date server state (this allows to have * restarting RS1 not sending him some updates he already sent) */ ds2.stop(); bw.shutdown(); br.shutdown(); ServerState curState = ds1.getServerState(); ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID); br = new BrokerReader(ds2, DS2_ID); /** * RS1 restarts, DS1 should get back to normal status */ rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD); checkConnection(30, DS2_ID); sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); /** * DS2 sends again a lot of changes to make DS1 degraded again */ bw = new BrokerWriter(ds2, DS2_ID, false); bw.follow(); Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS); /** * RS1 stops to make DS1 go to not connected status (from degraded status) */ rs1.remove(); bw.pause(); sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS); /** * DS2 restarts with up to date server state (this allows to have * restarting RS1 not sending him some updates he already sent) */ ds2.stop(); bw.shutdown(); br.shutdown(); curState = ds1.getServerState(); ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID); br = new BrokerReader(ds2, DS2_ID); /** * RS1 restarts, DS1 should reconnect in degraded status (from not connected * this time, not from state machine entry) */ rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD); // It is too difficult to tune the right sleep so disabling this test: // Sometimes the status analyzer may be fast and quickly change the status // of DS1 to NORMAL_STATUS //sleep(2000); //sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS); checkConnection(30, DS2_ID); /** * DS1 should come back in normal status after a while */ sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); /** * DS2 sends a reset gen id order with wrong gen id: DS1 should go into bad generation id status */ long BAD_GEN_ID = 999999L; resetGenId(ds2, BAD_GEN_ID); // ds2 will also go bad gen sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS); /** * DS2 sends again a reset gen id order with right id: DS1 should be disconnected * by RS then reconnect and enter again in normal status. This goes through * not connected status but not possible to check as should reconnect immediately */ resetGenId(ds2, EMPTY_DN_GENID); // ds2 will also be disconnected ds2.stop(); br.shutdown(); // Reader could reconnect broker, but gen id would be bad: need to recreate a broker to send changex sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); /** * DS2 sends again a lot of changes to make DS1 degraded again */ curState = ds1.getServerState(); ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID); checkConnection(30, DS2_ID); bw = new BrokerWriter(ds2, DS2_ID, false); br = new BrokerReader(ds2, DS2_ID); bw.follow(); Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS); /** * DS2 sends reset gen id order with bad gen id: DS1 should go in bad gen id * status (from degraded status this time) */ resetGenId(ds2, -1); // -1 to allow next step full update and flush RS db so that DS1 can reconnect after full update sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS); bw.pause(); /** * DS2 engages full update (while DS1 in bad gen id status), DS1 should go * in full update status */ BrokerInitializer bi = new BrokerInitializer(ds2, DS2_ID, false); bi.initFullUpdate(DS1_ID, 200); sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS); /** * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status) * and come back to normal status (RS genid was -1 so RS will adopt ne genb id) */ bi.runFullUpdate(); sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); /** * DS2 sends changes to DS1: DS1 should go in degraded status */ ds2.stop(); // will need a new broker with another gen id restart it bw.shutdown(); br.shutdown(); long newGen = ds1.getGenerationID(); curState = ds1.getServerState(); ds2 = createReplicationBroker(DS2_ID, curState, newGen); checkConnection(30, DS2_ID); bw = new BrokerWriter(ds2, DS2_ID, false); br = new BrokerReader(ds2, DS2_ID); bw.follow(); Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS); /** * DS2 engages full update (while DS1 in degraded status), DS1 should go * in full update status */ bi = new BrokerInitializer(ds2, DS2_ID, false); bi.initFullUpdate(DS1_ID, 300); sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS); bw.pause(); /** * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status) * and come back to bad gen id status (RS genid was another gen id (300 entries instead of 200)) */ bi.runFullUpdate(); sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS); /** * DS2 sends reset gen id with gen id same as DS1: DS1 will be disconnected * by RS (not connected status) and come back to normal status */ ds2.stop(); // will need a new broker with another gen id restart it bw.shutdown(); br.shutdown(); newGen = ds1.getGenerationID(); curState = ds1.getServerState(); ds2 = createReplicationBroker(DS2_ID, curState, newGen); checkConnection(30, DS2_ID); br = new BrokerReader(ds2, DS2_ID); resetGenId(ds2, newGen); // Make DS1 reconnect in normal status sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); /** * DS2 engages full update (while DS1 in normal status), DS1 should go * in full update status */ bi = new BrokerInitializer(ds2, DS2_ID, false); bi.initFullUpdate(DS1_ID, 300); // 300 entries will compute same genid of the RS sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS); /** * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status) * and come back to normal status (process full update with same data as * before so RS already has right gen id: version with 300 entries) */ bi.runFullUpdate(); ds2.stop(); br.shutdown(); sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); /** * RS1 stops, DS1 should go to not connected status */ rs1.remove(); sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS); } finally { // Finalize test endTest(); if (bw != null) bw.shutdown(); if (br != null) br.shutdown(); } } /** * Set up the environment. * * @throws Exception * If the environment could not be set up. */ @BeforeClass @Override public void setUp() throws Exception { super.setUp(); EXAMPLE_DN_ = DN.valueOf(EXAMPLE_DN); // Note: this test does not use the memory test backend as for having a DS // going into degraded status, we need to send a lot of updates. This makes // the memory test backend crash with OutOfMemoryError. So we prefer here // a backend backed up with a file TestCaseUtils.clearJEBackend(false, "userRoot", EXAMPLE_DN); } /** * Clean up the environment. * * @throws Exception If the environment could not be set up. */ @AfterClass @Override public void classCleanUp() throws Exception { callParanoiaCheck = false; super.classCleanUp(); TestCaseUtils.clearJEBackend(false, "userRoot", EXAMPLE_DN); paranoiaCheck(); } /** * Sends a reset genid message through the given replication broker, with the * given new generation id */ private void resetGenId(ReplicationBroker rb, long newGenId) { ResetGenerationIdMsg resetMsg = new ResetGenerationIdMsg(newGenId); rb.publish(resetMsg); } /** * Utility class for making a full update through a broker. No separated thread * Usage: * BrokerInitializer bi = new BrokerInitializer(rb, sid, nEntries); * bi.initFullUpdate(); // Initializes a full update session by sending InitializeTargetMsg * bi.runFullUpdate(); // loops sending nEntries entries and finalizes the full update by sending the EntryDoneMsg */ private class BrokerInitializer { private ReplicationBroker rb = null; private int serverId = -1; private long userId = 0; private int destId = -1; // Server id of server to initialize private long nEntries = -1; // Number of entries to send to dest private boolean createReader = false; /** * If the BrokerInitializer is to be used for a lot of entries to send * (which is often the case), the reader thread should be enabled to make * the window subsystem work and allow the broker to send as much entries as * he wants. If not enabled, the user is responsible to call the receive * method of the broker himself. */ private BrokerReader reader = null; /** * Creates a broker initializer. Also creates a reader according to request */ public BrokerInitializer(ReplicationBroker rb, int serverId, boolean createReader) { this.rb = rb; this.serverId = serverId; this.createReader = createReader; } /** * Initializes a full update session by sending InitializeTargetMsg */ public void initFullUpdate(int destId, long nEntries) { // Also create reader ? if (createReader) { reader = new BrokerReader(rb, serverId); } debugInfo("Broker " + serverId + " initializer sending InitializeTargetMsg to server " + destId); this.destId = destId; this.nEntries = nEntries; // Send init msg to warn dest server it is going do be initialized RoutableMsg initTargetMsg = new InitializeTargetMsg( EXAMPLE_DN_, serverId, destId, serverId, nEntries, initWindow); rb.publish(initTargetMsg); // Send top entry for the domain String topEntry = "dn: " + EXAMPLE_DN + "\n" + "objectClass: top\n" + "objectClass: domain\n" + "dc: example\n" + "entryUUID: 11111111-1111-1111-1111-111111111111\n\n"; EntryMsg entryMsg = new EntryMsg(serverId, destId, topEntry.getBytes(), 1); rb.publish(entryMsg); } private EntryMsg createNextEntryMsg() { String userEntryUUID = "11111111-1111-1111-1111-111111111111"; long curId = ++userId; String userdn = "uid=full_update_user" + curId + "," + EXAMPLE_DN; String entryWithUUIDldif = "dn: " + userdn + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + "objectClass: inetOrgPerson\n" + "uid: full_update_user" + curId + "\n" + "homePhone: 951-245-7634\n" + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" + "mobile: 027-085-0537\n" + "postalAddress: Aaccf Amar$17984 Thirteenth Street" + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" + "street: 17984 Thirteenth Street\n" + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n" + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n" + "userPassword: password\n" + "initials: AA\n" + "entryUUID: " + userEntryUUID + "\n\n"; // -> WARNING: EntryMsg PDUs are concatenated before calling import on LDIF // file so need \n\n to separate LDIF entries to conform to LDIF file format // Create an entry message return new EntryMsg(serverId, destId, entryWithUUIDldif.getBytes(), (int) userId); } /** * Loops sending entries for full update (EntryMsg messages). When * terminates, sends the EntryDoneMsg to finalize full update. Number of * sent entries is determined at initFullUpdate call time. */ public void runFullUpdate() { debugInfo("Broker " + serverId + " initializer starting sending entries to server " + destId); for(long i = 0 ; i blocking call */ public void followAndPause(int nChanges) { debugInfo("Requested broker writer " + serverId + " to write " + nChanges + " change(s)."); pause(); // If however we were already working // Initialize counter system variables nChangesSent = 0; nChangesSentLimit = nChanges; careAboutAmountOfChanges = true; // Start session sessionDone.set(false); suspended.set(false); // Wait for all messages sent while (!sessionDone.get()) { TestCaseUtils.sleep(1000); } careAboutAmountOfChanges = false; } private AddMsg createNextAddMsg() { String userEntryUUID = "11111111-1111-1111-1111-111111111111"; long curId = userId++; String userdn = "uid=user" + curId + "," + EXAMPLE_DN; String entryWithUUIDldif = "dn: " + userdn + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + "objectClass: inetOrgPerson\n" + "uid: user" + curId + "\n" + "homePhone: 951-245-7634\n" + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" + "mobile: 027-085-0537\n" + "postalAddress: Aaccf Amar$17984 Thirteenth Street" + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" + "street: 17984 Thirteenth Street\n" + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n" + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n" + "userPassword: password\n" + "initials: AA\n" + "entryUUID: " + userEntryUUID + "\n"; Entry personWithUUIDEntry = null; try { personWithUUIDEntry = TestCaseUtils.entryFromLdifString( entryWithUUIDldif); } catch (Exception e) { throw new RuntimeException(e); } // Create an update message to add an entry. return new AddMsg(gen.newCSN(), personWithUUIDEntry.getName(), userEntryUUID, null, personWithUUIDEntry.getObjectClassAttribute(), personWithUUIDEntry.getAttributes(), new ArrayList()); } } /** * This simple reader just throws away the received * messages. It is used on a breaker we want to be able to send or read from some message * with (changes, entries (full update)...). Calling the receive method of the * broker allows to unblock the window mechanism and to send the desired messages. * Calling the updateWindowAfterReplay method allows to send when necessary the * window message to the RS to allow him send other messages he may want to send us. */ private class BrokerReader extends Thread { private ReplicationBroker rb; private int serverId = -1; private boolean shutdown; private ReplicationMsg lastMsg; public BrokerReader(ReplicationBroker rb, int serverId) { super("BrokerReader for broker " + serverId); this.rb = rb; this.serverId = serverId; start(); } /** Loop reading and throwing update messages */ @Override public void run() { while (!shutdown) { try { ReplicationMsg msg = rb.receive(); // Allow more messages to be sent by broker writer rb.updateWindowAfterReplay(); // Allow RS to send more messages to broker if (msg != null) debugInfo("Broker " + serverId + " reader received: " + msg); lastMsg = msg; } catch (SocketTimeoutException ex) { if (shutdown) return; } } debugInfo("Broker " + serverId + " reader thread is dying"); } /** * Returns last received message from reader When read, last value is * cleared */ public ReplicationMsg getLastMsg() { ReplicationMsg toReturn = lastMsg; lastMsg = null; return toReturn; } /** Stops reader thread */ public void shutdown() { shutdown = true; try { join(); } catch (InterruptedException ex) { /* Don't care */ } } } /** * Waits for a long time for an equality condition to be true. * Every second, the equality check is performed. After the provided amount of * seconds, if the equality is false, an assertion error is raised. * This methods ends either because the equality is true or if the timeout * occurs after the provided number of seconds. * This method is convenient when the the equality can only occur after a * period of time which is difficult to establish, but we know it will occur * anyway. This has 2 advantages compared to a classical code like this: * - sleep(some time); * - assertEquals(testedValue, expectedValue); * 1. If the sleep value is too big, this will impact the total time of * running tests uselessly. It may also penalize a fast running machine where * the sleep time value may be unnecessarily to long. * 2. If the sleep value is too small, some slow machines may have the test * fail whereas some additional time would have made the test succeed. * @param secTimeout Number of seconds to wait before failing. The value for * this should be high. A timeout is needed anyway to have the test campaign * finish anyway. * @param testedValue The value we want to test * @param expectedValue The value the tested value should be equal to */ private void sleepAssertStatusEquals(int secTimeout, LDAPReplicationDomain testedValue, ServerStatus expectedValue) throws Exception { assertTrue(testedValue != null && expectedValue != null, "sleepAssertStatusEquals: null parameters"); // Go out of the loop only if equality is obtained or if timeout occurs int nSec = 0; while (true) { Thread.sleep(1000); nSec++; // Test equality of values if (testedValue.getStatus().equals(expectedValue)) { debugInfo("sleepAssertStatusEquals: equality obtained after " + nSec + " seconds (" + expectedValue + ")."); return; } // Timeout reached, end with error assertTrue(nSec < secTimeout, "sleepAssertStatusEquals: got <" + testedValue.getStatus() + "> where expected <" + expectedValue + ">"); } } }