| | |
| | | import java.util.ArrayList; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Callable; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DoneMsg; |
| | | import org.opends.server.replication.protocol.EntryMsg; |
| | | import org.opends.server.replication.protocol.InitializeTargetMsg; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.service.ReplicationBroker; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.util.TestTimer; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static java.util.concurrent.TimeUnit.*; |
| | | |
| | | import static org.mockito.Mockito.*; |
| | | import static org.opends.server.util.CollectionUtils.*; |
| | | import static org.testng.Assert.*; |
| | |
| | | @SuppressWarnings("javadoc") |
| | | public class StateMachineTest extends ReplicationTestCase |
| | | { |
| | | |
| | | /** Server id definitions. */ |
| | | private static final String EXAMPLE_DN = "dc=example,dc=com"; |
| | | private static DN EXAMPLE_DN_; |
| | |
| | | } |
| | | } |
| | | |
| | | private static void shutdown(BrokerReader reader) |
| | | { |
| | | if (reader != null) |
| | | { |
| | | reader.shutdown(); |
| | | } |
| | | } |
| | | |
| | | private static void shutdown(BrokerWriter writer) |
| | | { |
| | | if (writer != null) |
| | | { |
| | | writer.shutdown(); |
| | | } |
| | | } |
| | | |
| | | private void initTest() throws IOException |
| | | { |
| | | rs1Port = -1; |
| | |
| | | rs1Port = -1; |
| | | } |
| | | |
| | | /** |
| | | * Check connection of the provided ds to the |
| | | * replication server. Waits for connection to be ok up to secTimeout seconds |
| | | * before failing. |
| | | */ |
| | | private void checkConnection(int secTimeout, int dsId) throws Exception |
| | | /** Waits until the provided ds is connected to the replication server. */ |
| | | private void waitUntiConnected(final int dsId) throws Exception |
| | | { |
| | | ReplicationBroker rb = null; |
| | | LDAPReplicationDomain rd = null; |
| | | TestTimer timer = new TestTimer.Builder() |
| | | .maxSleep(30, SECONDS) |
| | | .sleepTimes(100, MILLISECONDS) |
| | | .toTimer(); |
| | | timer.repeatUntilSuccess(new Callable<Void>() |
| | | { |
| | | @Override |
| | | public Void call() throws Exception |
| | | { |
| | | assertTrue(isConnected(dsId), "checkConnection: DS " + dsId + " is not connected to the RS"); |
| | | return null; |
| | | } |
| | | }); |
| | | } |
| | | |
| | | private boolean isConnected(final int dsId) |
| | | { |
| | | switch (dsId) |
| | | { |
| | | case DS1_ID: |
| | | rd = ds1; |
| | | break; |
| | | case DS2_ID: |
| | | rb = ds2; |
| | | break; |
| | | case DS3_ID: |
| | | rb = ds3; |
| | | break; |
| | | default: |
| | | fail("Unknown ds server id."); |
| | | } |
| | | |
| | | int nSec = 0; |
| | | |
| | | // Go out of the loop only if connection is verified or if timeout occurs |
| | | while (true) |
| | | { |
| | | // Test connection |
| | | boolean connected = false; |
| | | if (rd != null) |
| | | { |
| | | connected = rd.isConnected(); |
| | | } |
| | | else |
| | | { |
| | | connected = rb.isConnected(); |
| | | } |
| | | |
| | | if (connected) |
| | | { |
| | | // Connection verified |
| | | debugInfo("checkConnection: connection of DS " + dsId + |
| | | " to RS obtained after " + nSec + " seconds."); |
| | | return; |
| | | } |
| | | |
| | | Thread.sleep(100); |
| | | nSec++; |
| | | |
| | | // Timeout reached, end with error |
| | | assertFalse(nSec > secTimeout * 10, |
| | | "checkConnection: DS " + dsId + " is not connected to the RS after " |
| | | + secTimeout + " seconds."); |
| | | case DS1_ID: |
| | | return ds1.isConnected(); |
| | | case DS2_ID: |
| | | return ds2.isConnected(); |
| | | case DS3_ID: |
| | | return ds3.isConnected(); |
| | | default: |
| | | fail("Unknown ds server id."); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new ReplicationServer. |
| | | */ |
| | | /** Creates a new ReplicationServer. */ |
| | | private ReplicationServer createReplicationServer(String testCase, |
| | | int degradedStatusThreshold) throws Exception |
| | | { |
| | |
| | | return new ReplicationServer(conf); |
| | | } |
| | | |
| | | /** |
| | | * Creates and starts a new ReplicationDomain configured for the replication |
| | | * server. |
| | | */ |
| | | /** Creates and starts a new ReplicationDomain configured for the replication server. */ |
| | | @SuppressWarnings("unchecked") |
| | | private LDAPReplicationDomain createReplicationDomain(int dsId) throws Exception |
| | | { |
| | |
| | | |
| | | try |
| | | { |
| | | |
| | | /** |
| | | * DS1 start, no RS available: DS1 should be in not connected status |
| | | */ |
| | | // DS1 start, no RS available: DS1 should be in not connected status |
| | | ds1 = createReplicationDomain(DS1_ID); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS); |
| | | |
| | | /** |
| | | * RS1 starts , DS1 should connect to it and be in normal status |
| | | */ |
| | | // RS1 starts , DS1 should connect to it and be in normal status |
| | | rs1 = createReplicationServer(testCase, 5000); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS); |
| | | |
| | | /** |
| | | * RS1 stops, DS1 should go in not connected status |
| | | */ |
| | | // RS1 stops, DS1 should go in not connected status |
| | | rs1.remove(); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS); |
| | | |
| | | waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS); |
| | | } finally |
| | | { |
| | | endTest(); |
| | |
| | | |
| | | try |
| | | { |
| | | /** |
| | | * RS1 starts with specified threshold value |
| | | */ |
| | | /* RS1 starts with specified threshold value */ |
| | | rs1 = createReplicationServer(testCase, thresholdValue); |
| | | |
| | | /** |
| | | * DS2 starts and connects to RS1. No reader and low window value at the |
| | | * beginning so writer for DS2 in RS should enqueue changes after first |
| | | * changes sent to DS. (window value reached: a window msg needed by RS for |
| | | * following sending changes to DS) |
| | | /* |
| | | * DS2 starts and connects to RS1. No reader and low window value at the beginning |
| | | * so writer for DS2 in RS should enqueue changes after first changes sent to DS. |
| | | * (window value reached: a window msg needed by RS for following sending changes to DS) |
| | | */ |
| | | ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID); |
| | | checkConnection(30, DS2_ID); |
| | | waitUntiConnected(DS2_ID); |
| | | |
| | | /** |
| | | * DS3 starts and connects to RS1 |
| | | */ |
| | | /* DS3 starts and connects to RS1 */ |
| | | ds3 = createReplicationBroker(DS3_ID, new ServerState(), EMPTY_DN_GENID); |
| | | br3 = new BrokerReader(ds3, DS3_ID); |
| | | checkConnection(30, DS3_ID); |
| | | waitUntiConnected(DS3_ID); |
| | | |
| | | // Send first changes to reach window and block DS2 writer queue. Writer will take them |
| | | // from queue and block (no more changes removed from writer queue) after |
| | |
| | | bw = new BrokerWriter(ds3, DS3_ID, false); |
| | | bw.followAndPause(11); |
| | | |
| | | /** |
| | | * DS3 sends changes (less than threshold): DS2 should still be in normal |
| | | * status so no topo message should be sent (update topo message |
| | | * for telling status of DS2 changed) |
| | | /* |
| | | * DS3 sends changes (less than threshold): DS2 should still be in normal status |
| | | * so no topo message should be sent (update topo message for telling status of DS2 changed) |
| | | */ |
| | | int nChangesSent = 0; |
| | | if (thresholdValue > 1) |
| | |
| | | assertNull(msg, (msg != null) ? msg.toString() : "null"); |
| | | } |
| | | |
| | | /** |
| | | * DS3 sends changes to reach the threshold value, DS3 should receive an |
| | | * update topo message with status of DS2: degraded status |
| | | /* |
| | | * DS3 sends changes to reach the threshold value, |
| | | * DS3 should receive an update topo message with status of DS2: degraded status |
| | | */ |
| | | bw.followAndPause(thresholdValue - nChangesSent); |
| | | // wait for a status MSG status analyzer to broker 3 |
| | | waitForDegradedStatusOnBroker3(); |
| | | |
| | | /** |
| | | * DS3 sends 10 additional changes after threshold value, DS2 should still be |
| | | * degraded so no topo message received. |
| | | /* |
| | | * DS3 sends 10 additional changes after threshold value, |
| | | * DS2 should still be degraded so no topo message received. |
| | | */ |
| | | bw.followAndPause(10); |
| | | bw.shutdown(); |
| | | shutdown(bw); |
| | | Thread.sleep(1000); // Be sure status analyzer has time to test |
| | | ReplicationMsg lastMsg = br3.getLastMsg(); |
| | | ReplicationMsg msg = br3.getLastMsg(); |
| | | debugInfo(testCase + " Step 3: last message from writer: " + msg); |
| | | assertNull(lastMsg); |
| | | |
| | | /** |
| | | /* |
| | | * DS2 replays every changes and should go back to normal status |
| | | * (create a reader to emulate replay of messages (messages read from queue)) |
| | | */ |
| | | br2 = new BrokerReader(ds2, DS2_ID); |
| | | // wait for a status MSG status analyzer to broker 3 |
| | | waitForDegradedStatusOnBroker3(); |
| | | |
| | | } finally |
| | | { |
| | | endTest(); |
| | | if (bw != null) |
| | | { |
| | | bw.shutdown(); |
| | | } |
| | | if (br3 != null) |
| | | { |
| | | br3.shutdown(); |
| | | } |
| | | if (br2 != null) |
| | | { |
| | | br2.shutdown(); |
| | | } |
| | | shutdown(bw); |
| | | shutdown(br3); |
| | | shutdown(br2); |
| | | } |
| | | } |
| | | |
| | |
| | | // DS2 starts and connects to RS1 |
| | | ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID); |
| | | br = new BrokerReader(ds2, DS2_ID); |
| | | checkConnection(30, DS2_ID); |
| | | waitUntiConnected(DS2_ID); |
| | | |
| | | // DS2 starts sending a lot of changes |
| | | bw = new BrokerWriter(ds2, DS2_ID, false); |
| | | bw.follow(); |
| | | Thread.sleep(1000); // Let some messages being queued in RS |
| | | |
| | | /** |
| | | * DS1 starts and connects to RS1, server state exchange should lead to |
| | | * start in degraded status as some changes should be in queued in the RS |
| | | * and the threshold value is 1 change in queue. |
| | | /* |
| | | * DS1 starts and connects to RS1, server state exchange should lead to start in degraded status |
| | | * as some changes should be in queued in the RS and the threshold value is 1 change in queue. |
| | | */ |
| | | ds1 = createReplicationDomain(DS1_ID); |
| | | checkConnection(30, DS1_ID); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS); |
| | | waitUntiConnected(DS1_ID); |
| | | waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS); |
| | | |
| | | /** |
| | | * DS2 stops sending changes: DS1 should replay pending changes and should |
| | | * enter the normal status |
| | | */ |
| | | /* DS2 stops sending changes: DS1 should replay pending changes and should enter the normal status */ |
| | | bw.pause(); |
| | | // Sleep enough so that replay can be done and analyzer has time |
| | | // to see that the queue length is now under the threshold value. |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS); |
| | | |
| | | /** |
| | | * RS1 stops to make DS1 go to not connected status (from normal status) |
| | | */ |
| | | /* RS1 stops to make DS1 go to not connected status (from normal status) */ |
| | | rs1.remove(); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS); |
| | | |
| | | /** |
| | | * DS2 restarts with up to date server state (this allows to have |
| | | * restarting RS1 not sending him some updates he already sent) |
| | | /* |
| | | * DS2 restarts with up to date server state |
| | | * (this allows to have restarting RS1 not sending him some updates he already sent) |
| | | */ |
| | | ds2.stop(); |
| | | bw.shutdown(); |
| | | br.shutdown(); |
| | | shutdown(bw); |
| | | shutdown(br); |
| | | ServerState curState = ds1.getServerState(); |
| | | ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID); |
| | | br = new BrokerReader(ds2, DS2_ID); |
| | | |
| | | /** |
| | | * RS1 restarts, DS1 should get back to normal status |
| | | */ |
| | | /* RS1 restarts, DS1 should get back to normal status */ |
| | | rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD); |
| | | checkConnection(30, DS2_ID); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); |
| | | waitUntiConnected(DS2_ID); |
| | | waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS); |
| | | |
| | | /** |
| | | * DS2 sends again a lot of changes to make DS1 degraded again |
| | | */ |
| | | /* DS2 sends again a lot of changes to make DS1 degraded again */ |
| | | bw = new BrokerWriter(ds2, DS2_ID, false); |
| | | bw.follow(); |
| | | Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS); |
| | | |
| | | /** |
| | | * RS1 stops to make DS1 go to not connected status (from degraded status) |
| | | */ |
| | | /* RS1 stops to make DS1 go to not connected status (from degraded status) */ |
| | | rs1.remove(); |
| | | bw.pause(); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS); |
| | | |
| | | |
| | | /** |
| | | * DS2 restarts with up to date server state (this allows to have |
| | | * restarting RS1 not sending him some updates he already sent) |
| | | /* |
| | | * DS2 restarts with up to date server state |
| | | * (this allows to have restarting RS1 not sending him some updates he already sent) |
| | | */ |
| | | ds2.stop(); |
| | | bw.shutdown(); |
| | | br.shutdown(); |
| | | shutdown(bw); |
| | | shutdown(br); |
| | | curState = ds1.getServerState(); |
| | | ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID); |
| | | br = new BrokerReader(ds2, DS2_ID); |
| | | |
| | | /** |
| | | * RS1 restarts, DS1 should reconnect in degraded status (from not connected |
| | | * this time, not from state machine entry) |
| | | /* |
| | | * RS1 restarts, DS1 should reconnect in degraded status |
| | | * (from not connected this time, not from state machine entry) |
| | | */ |
| | | rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD); |
| | | // It is too difficult to tune the right sleep so disabling this test: |
| | |
| | | // of DS1 to NORMAL_STATUS |
| | | //sleep(2000); |
| | | //sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS); |
| | | checkConnection(30, DS2_ID); |
| | | waitUntiConnected(DS2_ID); |
| | | |
| | | /** |
| | | * DS1 should come back in normal status after a while |
| | | */ |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); |
| | | /* DS1 should come back in normal status after a while */ |
| | | waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS); |
| | | |
| | | /** |
| | | * DS2 sends a reset gen id order with wrong gen id: DS1 should go into bad generation id status |
| | | /* |
| | | * DS2 sends a reset gen id order with wrong gen id: |
| | | * DS1 should go into bad generation id status |
| | | */ |
| | | long BAD_GEN_ID = 999999L; |
| | | resetGenId(ds2, BAD_GEN_ID); // ds2 will also go bad gen |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS); |
| | | |
| | | /** |
| | | * DS2 sends again a reset gen id order with right id: DS1 should be disconnected |
| | | * by RS then reconnect and enter again in normal status. This goes through |
| | | * not connected status but not possible to check as should reconnect immediately |
| | | /* |
| | | * DS2 sends again a reset gen id order with right id: DS1 should be disconnected by RS |
| | | * then reconnect and enter again in normal status. This goes through not connected status |
| | | * but not possible to check as should reconnect immediately |
| | | */ |
| | | resetGenId(ds2, EMPTY_DN_GENID); // ds2 will also be disconnected |
| | | ds2.stop(); |
| | | br.shutdown(); // Reader could reconnect broker, but gen id would be bad: need to recreate a broker to send changex |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); |
| | | shutdown(br); // Reader could reconnect broker, but gen id would be bad: need to recreate a |
| | | // broker to send changex |
| | | waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS); |
| | | |
| | | /** |
| | | * DS2 sends again a lot of changes to make DS1 degraded again |
| | | */ |
| | | /* DS2 sends again a lot of changes to make DS1 degraded again */ |
| | | curState = ds1.getServerState(); |
| | | ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID); |
| | | checkConnection(30, DS2_ID); |
| | | waitUntiConnected(DS2_ID); |
| | | bw = new BrokerWriter(ds2, DS2_ID, false); |
| | | br = new BrokerReader(ds2, DS2_ID); |
| | | bw.follow(); |
| | | Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS); |
| | | |
| | | /** |
| | | * DS2 sends reset gen id order with bad gen id: DS1 should go in bad gen id |
| | | * status (from degraded status this time) |
| | | /* |
| | | * DS2 sends reset gen id order with bad gen id: DS1 should go in bad gen id status |
| | | * (from degraded status this time) |
| | | */ |
| | | resetGenId(ds2, -1); // -1 to allow next step full update and flush RS db so that DS1 can reconnect after full update |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS); |
| | | bw.pause(); |
| | | |
| | | /** |
| | | * DS2 engages full update (while DS1 in bad gen id status), DS1 should go |
| | | * in full update status |
| | | /* |
| | | * DS2 engages full update (while DS1 in bad gen id status), |
| | | * DS1 should go in full update status |
| | | */ |
| | | BrokerInitializer bi = new BrokerInitializer(ds2, DS2_ID, false); |
| | | bi.initFullUpdate(DS1_ID, 200); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS); |
| | | |
| | | /** |
| | | /* |
| | | * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status) |
| | | * and come back to normal status (RS genid was -1 so RS will adopt ne genb id) |
| | | * and come back to normal status (RS genid was -1 so RS will adopt new gen id) |
| | | */ |
| | | bi.runFullUpdate(); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS); |
| | | |
| | | /** |
| | | * DS2 sends changes to DS1: DS1 should go in degraded status |
| | | */ |
| | | /* DS2 sends changes to DS1: DS1 should go in degraded status */ |
| | | ds2.stop(); // will need a new broker with another gen id restart it |
| | | bw.shutdown(); |
| | | br.shutdown(); |
| | | shutdown(bw); |
| | | shutdown(br); |
| | | long newGen = ds1.getGenerationID(); |
| | | curState = ds1.getServerState(); |
| | | ds2 = createReplicationBroker(DS2_ID, curState, newGen); |
| | | checkConnection(30, DS2_ID); |
| | | waitUntiConnected(DS2_ID); |
| | | bw = new BrokerWriter(ds2, DS2_ID, false); |
| | | br = new BrokerReader(ds2, DS2_ID); |
| | | bw.follow(); |
| | | Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS); |
| | | |
| | | /** |
| | | * DS2 engages full update (while DS1 in degraded status), DS1 should go |
| | | * in full update status |
| | | */ |
| | | /* DS2 engages full update (while DS1 in degraded status), DS1 should go in full update status */ |
| | | bi = new BrokerInitializer(ds2, DS2_ID, false); |
| | | bi.initFullUpdate(DS1_ID, 300); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS); |
| | | bw.pause(); |
| | | |
| | | /** |
| | | /* |
| | | * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status) |
| | | * and come back to bad gen id status (RS genid was another gen id (300 entries instead of 200)) |
| | | * and come back to bad gen id status (RS genid was another gen id (300 entries instead of 200) |
| | | */ |
| | | bi.runFullUpdate(); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS); |
| | | |
| | | /** |
| | | * DS2 sends reset gen id with gen id same as DS1: DS1 will be disconnected |
| | | * by RS (not connected status) and come back to normal status |
| | | /* |
| | | * DS2 sends reset gen id with gen id same as DS1: |
| | | * DS1 will be disconnected by RS (not connected status) and come back to normal status |
| | | */ |
| | | ds2.stop(); // will need a new broker with another gen id restart it |
| | | bw.shutdown(); |
| | | br.shutdown(); |
| | | shutdown(bw); |
| | | shutdown(br); |
| | | newGen = ds1.getGenerationID(); |
| | | curState = ds1.getServerState(); |
| | | ds2 = createReplicationBroker(DS2_ID, curState, newGen); |
| | | checkConnection(30, DS2_ID); |
| | | waitUntiConnected(DS2_ID); |
| | | br = new BrokerReader(ds2, DS2_ID); |
| | | resetGenId(ds2, newGen); // Make DS1 reconnect in normal status |
| | | |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS); |
| | | |
| | | /** |
| | | * DS2 engages full update (while DS1 in normal status), DS1 should go |
| | | * in full update status |
| | | */ |
| | | /* DS2 engages full update (while DS1 in normal status), DS1 should go in full update status */ |
| | | bi = new BrokerInitializer(ds2, DS2_ID, false); |
| | | bi.initFullUpdate(DS1_ID, 300); // 300 entries will compute same genid of the RS |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS); |
| | | waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS); |
| | | |
| | | /** |
| | | /* |
| | | * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status) |
| | | * and come back to normal status (process full update with same data as |
| | | * before so RS already has right gen id: version with 300 entries) |
| | | * and come back to normal status (process full update with same data as before so RS already |
| | | * has right gen id: version with 300 entries) |
| | | */ |
| | | bi.runFullUpdate(); |
| | | ds2.stop(); |
| | | br.shutdown(); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS); |
| | | shutdown(br); |
| | | waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS); |
| | | |
| | | /** |
| | | * RS1 stops, DS1 should go to not connected status |
| | | */ |
| | | /* RS1 stops, DS1 should go to not connected status */ |
| | | rs1.remove(); |
| | | sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS); |
| | | |
| | | waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS); |
| | | } finally |
| | | { |
| | | // Finalize test |
| | | endTest(); |
| | | if (bw != null) |
| | | { |
| | | bw.shutdown(); |
| | | } |
| | | if (br != null) |
| | | { |
| | | br.shutdown(); |
| | | } |
| | | shutdown(bw); |
| | | shutdown(br); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | private class BrokerInitializer |
| | | { |
| | | |
| | | private ReplicationBroker rb; |
| | | private int serverId = -1; |
| | | private long userId; |
| | |
| | | */ |
| | | private BrokerReader reader; |
| | | |
| | | /** |
| | | * Creates a broker initializer. Also creates a reader according to request |
| | | */ |
| | | /** Creates a broker initializer. Also creates a reader according to request */ |
| | | public BrokerInitializer(ReplicationBroker rb, int serverId, |
| | | boolean createReader) |
| | | { |
| | |
| | | this.createReader = createReader; |
| | | } |
| | | |
| | | /** |
| | | * Initializes a full update session by sending InitializeTargetMsg. |
| | | */ |
| | | /** Initializes a full update session by sending InitializeTargetMsg. */ |
| | | public void initFullUpdate(int destId, long nEntries) |
| | | { |
| | | // Also create reader ? |
| | |
| | | |
| | | if (createReader) |
| | | { |
| | | reader.shutdown(); |
| | | shutdown(reader); |
| | | } |
| | | |
| | | debugInfo("Broker " + serverId + " initializer thread is dying"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Thread for sending a lot of changes through a broker. |
| | | */ |
| | | /** Thread for sending a lot of changes through a broker. */ |
| | | private class BrokerWriter extends Thread |
| | | { |
| | | |
| | | private ReplicationBroker rb; |
| | | private int serverId = -1; |
| | | private long userId; |
| | |
| | | debugInfo("Broker " + serverId + " writer thread is dying"); |
| | | } |
| | | |
| | | /** |
| | | * Stops the writer thread. |
| | | */ |
| | | /** Stops the writer thread. */ |
| | | public void shutdown() |
| | | { |
| | | suspended.set(true); // If were working |
| | |
| | | } |
| | | |
| | | // Stop reader if any |
| | | if (reader != null) |
| | | { |
| | | reader.shutdown(); |
| | | } |
| | | StateMachineTest.shutdown(reader); |
| | | } |
| | | |
| | | /** |
| | | * Suspends the writer thread. |
| | | */ |
| | | /** Suspends the writer thread. */ |
| | | public void pause() |
| | | { |
| | | if (isPaused()) |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test if the writer is suspended. |
| | | */ |
| | | /** Test if the writer is suspended. */ |
| | | public boolean isPaused() |
| | | { |
| | | return sessionDone.get(); |
| | | } |
| | | |
| | | /** |
| | | * Resumes the writer thread until it is paused. |
| | | */ |
| | | /** Resumes the writer thread until it is paused. */ |
| | | public void follow() |
| | | { |
| | | sessionDone.set(false); |
| | |
| | | */ |
| | | private class BrokerReader extends Thread |
| | | { |
| | | |
| | | private ReplicationBroker rb; |
| | | private int serverId = -1; |
| | | private boolean shutdown; |
| | |
| | | debugInfo("Broker " + serverId + " reader thread is dying"); |
| | | } |
| | | |
| | | /** |
| | | * Returns last received message from reader When read, last value is cleared. |
| | | */ |
| | | /** Returns last received message from reader When read, last value is cleared. */ |
| | | public ReplicationMsg getLastMsg() |
| | | { |
| | | ReplicationMsg toReturn = lastMsg; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Waits for a long time for an equality condition to be true. |
| | | * Every second, the equality check is performed. After the provided amount of |
| | | * seconds, if the equality is false, an assertion error is raised. |
| | | * This methods ends either because the equality is true or if the timeout |
| | | * occurs after the provided number of seconds. |
| | | * This method is convenient when the the equality can only occur after a |
| | | * period of time which is difficult to establish, but we know it will occur |
| | | * anyway. This has 2 advantages compared to a classical code like this: |
| | | * - sleep(some time); |
| | | * - assertEquals(testedValue, expectedValue); |
| | | * 1. If the sleep value is too big, this will impact the total time of |
| | | * running tests uselessly. It may also penalize a fast running machine where |
| | | * the sleep time value may be unnecessarily to long. |
| | | * 2. If the sleep value is too small, some slow machines may have the test |
| | | * fail whereas some additional time would have made the test succeed. |
| | | * @param secTimeout Number of seconds to wait before failing. The value for |
| | | * this should be high. A timeout is needed anyway to have the test campaign |
| | | * finish anyway. |
| | | * @param testedValue The value we want to test |
| | | * @param expectedValue The value the tested value should be equal to |
| | | * Waits until the domain status reaches the expected status. |
| | | * @param domain The domain whose status we want to test |
| | | * @param expectedStatus The expected domain status |
| | | */ |
| | | private void sleepAssertStatusEquals(int secTimeout, LDAPReplicationDomain testedValue, |
| | | ServerStatus expectedValue) throws Exception |
| | | private void waitUntilStatusEquals(final LDAPReplicationDomain domain, final ServerStatus expectedStatus) throws Exception |
| | | { |
| | | assertTrue(testedValue != null && expectedValue != null, "sleepAssertStatusEquals: null parameters"); |
| | | assertNotNull(domain); |
| | | assertNotNull(expectedStatus); |
| | | |
| | | // Go out of the loop only if equality is obtained or if timeout occurs |
| | | int nSec = 0; |
| | | while (true) |
| | | TestTimer timer = new TestTimer.Builder() |
| | | .maxSleep(30, SECONDS) |
| | | .sleepTimes(500, MILLISECONDS) |
| | | .toTimer(); |
| | | timer.repeatUntilSuccess(new Callable<Void>() |
| | | { |
| | | Thread.sleep(1000); |
| | | nSec++; |
| | | |
| | | // Test equality of values |
| | | if (testedValue.getStatus().equals(expectedValue)) |
| | | @Override |
| | | public Void call() throws Exception |
| | | { |
| | | debugInfo("sleepAssertStatusEquals: equality obtained after " |
| | | + nSec + " seconds (" + expectedValue + ")."); |
| | | return; |
| | | assertEquals(domain.getStatus(), expectedStatus); |
| | | return null; |
| | | } |
| | | |
| | | // Timeout reached, end with error |
| | | assertTrue(nSec < secTimeout, "sleepAssertStatusEquals: got <" |
| | | + testedValue.getStatus() + "> where expected <" + expectedValue |
| | | + ">"); |
| | | } |
| | | }); |
| | | } |
| | | } |