From b7465fdf4336e1d2f4c7dd231940b4034b1abb0c Mon Sep 17 00:00:00 2001
From: Jean-Noël Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 07 Sep 2015 08:57:54 +0000
Subject: [PATCH] Code cleaunp in StateMachineTest + used TestTimer
---
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java | 501 ++++++++++++++++++++++---------------------------------
1 files changed, 198 insertions(+), 303 deletions(-)
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java
index 5a940e0..0a7b55e 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java
@@ -31,6 +31,7 @@
import java.util.ArrayList;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.forgerock.i18n.LocalizableMessage;
@@ -46,18 +47,28 @@
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DoneMsg;
+import org.opends.server.replication.protocol.EntryMsg;
+import org.opends.server.replication.protocol.InitializeTargetMsg;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
+import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
+import org.opends.server.util.TestTimer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static java.util.concurrent.TimeUnit.*;
+
import static org.mockito.Mockito.*;
import static org.opends.server.util.CollectionUtils.*;
import static org.testng.Assert.*;
@@ -69,7 +80,6 @@
@SuppressWarnings("javadoc")
public class StateMachineTest extends ReplicationTestCase
{
-
/** Server id definitions. */
private static final String EXAMPLE_DN = "dc=example,dc=com";
private static DN EXAMPLE_DN_;
@@ -96,6 +106,22 @@
}
}
+ private static void shutdown(BrokerReader reader)
+ {
+ if (reader != null)
+ {
+ reader.shutdown();
+ }
+ }
+
+ private static void shutdown(BrokerWriter writer)
+ {
+ if (writer != null)
+ {
+ writer.shutdown();
+ }
+ }
+
private void initTest() throws IOException
{
rs1Port = -1;
@@ -122,67 +148,41 @@
rs1Port = -1;
}
- /**
- * Check connection of the provided ds to the
- * replication server. Waits for connection to be ok up to secTimeout seconds
- * before failing.
- */
- private void checkConnection(int secTimeout, int dsId) throws Exception
+ /** Waits until the provided ds is connected to the replication server. */
+ private void waitUntiConnected(final int dsId) throws Exception
{
- ReplicationBroker rb = null;
- LDAPReplicationDomain rd = null;
+ TestTimer timer = new TestTimer.Builder()
+ .maxSleep(30, SECONDS)
+ .sleepTimes(100, MILLISECONDS)
+ .toTimer();
+ timer.repeatUntilSuccess(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ assertTrue(isConnected(dsId), "checkConnection: DS " + dsId + " is not connected to the RS");
+ return null;
+ }
+ });
+ }
+
+ private boolean isConnected(final int dsId)
+ {
switch (dsId)
{
- case DS1_ID:
- rd = ds1;
- break;
- case DS2_ID:
- rb = ds2;
- break;
- case DS3_ID:
- rb = ds3;
- break;
- default:
- fail("Unknown ds server id.");
- }
-
- int nSec = 0;
-
- // Go out of the loop only if connection is verified or if timeout occurs
- while (true)
- {
- // Test connection
- boolean connected = false;
- if (rd != null)
- {
- connected = rd.isConnected();
- }
- else
- {
- connected = rb.isConnected();
- }
-
- if (connected)
- {
- // Connection verified
- debugInfo("checkConnection: connection of DS " + dsId +
- " to RS obtained after " + nSec + " seconds.");
- return;
- }
-
- Thread.sleep(100);
- nSec++;
-
- // Timeout reached, end with error
- assertFalse(nSec > secTimeout * 10,
- "checkConnection: DS " + dsId + " is not connected to the RS after "
- + secTimeout + " seconds.");
+ case DS1_ID:
+ return ds1.isConnected();
+ case DS2_ID:
+ return ds2.isConnected();
+ case DS3_ID:
+ return ds3.isConnected();
+ default:
+ fail("Unknown ds server id.");
+ return false;
}
}
- /**
- * Creates a new ReplicationServer.
- */
+ /** Creates a new ReplicationServer. */
private ReplicationServer createReplicationServer(String testCase,
int degradedStatusThreshold) throws Exception
{
@@ -195,10 +195,7 @@
return new ReplicationServer(conf);
}
- /**
- * Creates and starts a new ReplicationDomain configured for the replication
- * server.
- */
+ /** Creates and starts a new ReplicationDomain configured for the replication server. */
@SuppressWarnings("unchecked")
private LDAPReplicationDomain createReplicationDomain(int dsId) throws Exception
{
@@ -263,25 +260,17 @@
try
{
-
- /**
- * DS1 start, no RS available: DS1 should be in not connected status
- */
+ // DS1 start, no RS available: DS1 should be in not connected status
ds1 = createReplicationDomain(DS1_ID);
- sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
- /**
- * RS1 starts , DS1 should connect to it and be in normal status
- */
+ // RS1 starts , DS1 should connect to it and be in normal status
rs1 = createReplicationServer(testCase, 5000);
- sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
- /**
- * RS1 stops, DS1 should go in not connected status
- */
+ // RS1 stops, DS1 should go in not connected status
rs1.remove();
- sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
-
+ waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
} finally
{
endTest();
@@ -324,26 +313,21 @@
try
{
- /**
- * RS1 starts with specified threshold value
- */
+ /* RS1 starts with specified threshold value */
rs1 = createReplicationServer(testCase, thresholdValue);
- /**
- * DS2 starts and connects to RS1. No reader and low window value at the
- * beginning so writer for DS2 in RS should enqueue changes after first
- * changes sent to DS. (window value reached: a window msg needed by RS for
- * following sending changes to DS)
+ /*
+ * DS2 starts and connects to RS1. No reader and low window value at the beginning
+ * so writer for DS2 in RS should enqueue changes after first changes sent to DS.
+ * (window value reached: a window msg needed by RS for following sending changes to DS)
*/
ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID);
- checkConnection(30, DS2_ID);
+ waitUntiConnected(DS2_ID);
- /**
- * DS3 starts and connects to RS1
- */
+ /* DS3 starts and connects to RS1 */
ds3 = createReplicationBroker(DS3_ID, new ServerState(), EMPTY_DN_GENID);
br3 = new BrokerReader(ds3, DS3_ID);
- checkConnection(30, DS3_ID);
+ waitUntiConnected(DS3_ID);
// Send first changes to reach window and block DS2 writer queue. Writer will take them
// from queue and block (no more changes removed from writer queue) after
@@ -351,10 +335,9 @@
bw = new BrokerWriter(ds3, DS3_ID, false);
bw.followAndPause(11);
- /**
- * DS3 sends changes (less than threshold): DS2 should still be in normal
- * status so no topo message should be sent (update topo message
- * for telling status of DS2 changed)
+ /*
+ * DS3 sends changes (less than threshold): DS2 should still be in normal status
+ * so no topo message should be sent (update topo message for telling status of DS2 changed)
*/
int nChangesSent = 0;
if (thresholdValue > 1)
@@ -367,49 +350,39 @@
assertNull(msg, (msg != null) ? msg.toString() : "null");
}
- /**
- * DS3 sends changes to reach the threshold value, DS3 should receive an
- * update topo message with status of DS2: degraded status
+ /*
+ * DS3 sends changes to reach the threshold value,
+ * DS3 should receive an update topo message with status of DS2: degraded status
*/
bw.followAndPause(thresholdValue - nChangesSent);
// wait for a status MSG status analyzer to broker 3
waitForDegradedStatusOnBroker3();
- /**
- * DS3 sends 10 additional changes after threshold value, DS2 should still be
- * degraded so no topo message received.
+ /*
+ * DS3 sends 10 additional changes after threshold value,
+ * DS2 should still be degraded so no topo message received.
*/
bw.followAndPause(10);
- bw.shutdown();
+ shutdown(bw);
Thread.sleep(1000); // Be sure status analyzer has time to test
ReplicationMsg lastMsg = br3.getLastMsg();
ReplicationMsg msg = br3.getLastMsg();
debugInfo(testCase + " Step 3: last message from writer: " + msg);
assertNull(lastMsg);
- /**
+ /*
* DS2 replays every changes and should go back to normal status
* (create a reader to emulate replay of messages (messages read from queue))
*/
br2 = new BrokerReader(ds2, DS2_ID);
// wait for a status MSG status analyzer to broker 3
waitForDegradedStatusOnBroker3();
-
} finally
{
endTest();
- if (bw != null)
- {
- bw.shutdown();
- }
- if (br3 != null)
- {
- br3.shutdown();
- }
- if (br2 != null)
- {
- br2.shutdown();
- }
+ shutdown(bw);
+ shutdown(br3);
+ shutdown(br2);
}
}
@@ -462,85 +435,72 @@
// DS2 starts and connects to RS1
ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID);
br = new BrokerReader(ds2, DS2_ID);
- checkConnection(30, DS2_ID);
+ waitUntiConnected(DS2_ID);
// DS2 starts sending a lot of changes
bw = new BrokerWriter(ds2, DS2_ID, false);
bw.follow();
Thread.sleep(1000); // Let some messages being queued in RS
- /**
- * DS1 starts and connects to RS1, server state exchange should lead to
- * start in degraded status as some changes should be in queued in the RS
- * and the threshold value is 1 change in queue.
+ /*
+ * DS1 starts and connects to RS1, server state exchange should lead to start in degraded status
+ * as some changes should be in queued in the RS and the threshold value is 1 change in queue.
*/
ds1 = createReplicationDomain(DS1_ID);
- checkConnection(30, DS1_ID);
- sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
+ waitUntiConnected(DS1_ID);
+ waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
- /**
- * DS2 stops sending changes: DS1 should replay pending changes and should
- * enter the normal status
- */
+ /* DS2 stops sending changes: DS1 should replay pending changes and should enter the normal status */
bw.pause();
// Sleep enough so that replay can be done and analyzer has time
// to see that the queue length is now under the threshold value.
- sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
- /**
- * RS1 stops to make DS1 go to not connected status (from normal status)
- */
+ /* RS1 stops to make DS1 go to not connected status (from normal status) */
rs1.remove();
- sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
- /**
- * DS2 restarts with up to date server state (this allows to have
- * restarting RS1 not sending him some updates he already sent)
+ /*
+ * DS2 restarts with up to date server state
+ * (this allows to have restarting RS1 not sending him some updates he already sent)
*/
ds2.stop();
- bw.shutdown();
- br.shutdown();
+ shutdown(bw);
+ shutdown(br);
ServerState curState = ds1.getServerState();
ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
br = new BrokerReader(ds2, DS2_ID);
- /**
- * RS1 restarts, DS1 should get back to normal status
- */
+ /* RS1 restarts, DS1 should get back to normal status */
rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
- checkConnection(30, DS2_ID);
- sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
+ waitUntiConnected(DS2_ID);
+ waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
- /**
- * DS2 sends again a lot of changes to make DS1 degraded again
- */
+ /* DS2 sends again a lot of changes to make DS1 degraded again */
bw = new BrokerWriter(ds2, DS2_ID, false);
bw.follow();
Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
- sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
- /**
- * RS1 stops to make DS1 go to not connected status (from degraded status)
- */
+ /* RS1 stops to make DS1 go to not connected status (from degraded status) */
rs1.remove();
bw.pause();
- sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
-
- /**
- * DS2 restarts with up to date server state (this allows to have
- * restarting RS1 not sending him some updates he already sent)
+ /*
+ * DS2 restarts with up to date server state
+ * (this allows to have restarting RS1 not sending him some updates he already sent)
*/
ds2.stop();
- bw.shutdown();
- br.shutdown();
+ shutdown(bw);
+ shutdown(br);
curState = ds1.getServerState();
ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
br = new BrokerReader(ds2, DS2_ID);
- /**
- * RS1 restarts, DS1 should reconnect in degraded status (from not connected
- * this time, not from state machine entry)
+ /*
+ * RS1 restarts, DS1 should reconnect in degraded status
+ * (from not connected this time, not from state machine entry)
*/
rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
// It is too difficult to tune the right sleep so disabling this test:
@@ -548,149 +508,130 @@
// of DS1 to NORMAL_STATUS
//sleep(2000);
//sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
- checkConnection(30, DS2_ID);
+ waitUntiConnected(DS2_ID);
- /**
- * DS1 should come back in normal status after a while
- */
- sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
+ /* DS1 should come back in normal status after a while */
+ waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
- /**
- * DS2 sends a reset gen id order with wrong gen id: DS1 should go into bad generation id status
+ /*
+ * DS2 sends a reset gen id order with wrong gen id:
+ * DS1 should go into bad generation id status
*/
long BAD_GEN_ID = 999999L;
resetGenId(ds2, BAD_GEN_ID); // ds2 will also go bad gen
- sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS);
- /**
- * DS2 sends again a reset gen id order with right id: DS1 should be disconnected
- * by RS then reconnect and enter again in normal status. This goes through
- * not connected status but not possible to check as should reconnect immediately
+ /*
+ * DS2 sends again a reset gen id order with right id: DS1 should be disconnected by RS
+ * then reconnect and enter again in normal status. This goes through not connected status
+ * but not possible to check as should reconnect immediately
*/
resetGenId(ds2, EMPTY_DN_GENID); // ds2 will also be disconnected
ds2.stop();
- br.shutdown(); // Reader could reconnect broker, but gen id would be bad: need to recreate a broker to send changex
- sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
+ shutdown(br); // Reader could reconnect broker, but gen id would be bad: need to recreate a
+ // broker to send changex
+ waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
- /**
- * DS2 sends again a lot of changes to make DS1 degraded again
- */
+ /* DS2 sends again a lot of changes to make DS1 degraded again */
curState = ds1.getServerState();
ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
- checkConnection(30, DS2_ID);
+ waitUntiConnected(DS2_ID);
bw = new BrokerWriter(ds2, DS2_ID, false);
br = new BrokerReader(ds2, DS2_ID);
bw.follow();
Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
- sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
- /**
- * DS2 sends reset gen id order with bad gen id: DS1 should go in bad gen id
- * status (from degraded status this time)
+ /*
+ * DS2 sends reset gen id order with bad gen id: DS1 should go in bad gen id status
+ * (from degraded status this time)
*/
resetGenId(ds2, -1); // -1 to allow next step full update and flush RS db so that DS1 can reconnect after full update
- sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS);
bw.pause();
- /**
- * DS2 engages full update (while DS1 in bad gen id status), DS1 should go
- * in full update status
+ /*
+ * DS2 engages full update (while DS1 in bad gen id status),
+ * DS1 should go in full update status
*/
BrokerInitializer bi = new BrokerInitializer(ds2, DS2_ID, false);
bi.initFullUpdate(DS1_ID, 200);
- sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS);
- /**
+ /*
* DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
- * and come back to normal status (RS genid was -1 so RS will adopt ne genb id)
+ * and come back to normal status (RS genid was -1 so RS will adopt new gen id)
*/
bi.runFullUpdate();
- sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
- /**
- * DS2 sends changes to DS1: DS1 should go in degraded status
- */
+ /* DS2 sends changes to DS1: DS1 should go in degraded status */
ds2.stop(); // will need a new broker with another gen id restart it
- bw.shutdown();
- br.shutdown();
+ shutdown(bw);
+ shutdown(br);
long newGen = ds1.getGenerationID();
curState = ds1.getServerState();
ds2 = createReplicationBroker(DS2_ID, curState, newGen);
- checkConnection(30, DS2_ID);
+ waitUntiConnected(DS2_ID);
bw = new BrokerWriter(ds2, DS2_ID, false);
br = new BrokerReader(ds2, DS2_ID);
bw.follow();
Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
- sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
- /**
- * DS2 engages full update (while DS1 in degraded status), DS1 should go
- * in full update status
- */
+ /* DS2 engages full update (while DS1 in degraded status), DS1 should go in full update status */
bi = new BrokerInitializer(ds2, DS2_ID, false);
bi.initFullUpdate(DS1_ID, 300);
- sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS);
bw.pause();
- /**
+ /*
* DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
- * and come back to bad gen id status (RS genid was another gen id (300 entries instead of 200))
+ * and come back to bad gen id status (RS genid was another gen id (300 entries instead of 200)
*/
bi.runFullUpdate();
- sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS);
- /**
- * DS2 sends reset gen id with gen id same as DS1: DS1 will be disconnected
- * by RS (not connected status) and come back to normal status
+ /*
+ * DS2 sends reset gen id with gen id same as DS1:
+ * DS1 will be disconnected by RS (not connected status) and come back to normal status
*/
ds2.stop(); // will need a new broker with another gen id restart it
- bw.shutdown();
- br.shutdown();
+ shutdown(bw);
+ shutdown(br);
newGen = ds1.getGenerationID();
curState = ds1.getServerState();
ds2 = createReplicationBroker(DS2_ID, curState, newGen);
- checkConnection(30, DS2_ID);
+ waitUntiConnected(DS2_ID);
br = new BrokerReader(ds2, DS2_ID);
resetGenId(ds2, newGen); // Make DS1 reconnect in normal status
- sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
- /**
- * DS2 engages full update (while DS1 in normal status), DS1 should go
- * in full update status
- */
+ /* DS2 engages full update (while DS1 in normal status), DS1 should go in full update status */
bi = new BrokerInitializer(ds2, DS2_ID, false);
bi.initFullUpdate(DS1_ID, 300); // 300 entries will compute same genid of the RS
- sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS);
+ waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS);
- /**
+ /*
* DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
- * and come back to normal status (process full update with same data as
- * before so RS already has right gen id: version with 300 entries)
+ * and come back to normal status (process full update with same data as before so RS already
+ * has right gen id: version with 300 entries)
*/
bi.runFullUpdate();
ds2.stop();
- br.shutdown();
- sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
+ shutdown(br);
+ waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
- /**
- * RS1 stops, DS1 should go to not connected status
- */
+ /* RS1 stops, DS1 should go to not connected status */
rs1.remove();
- sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
-
+ waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
} finally
{
// Finalize test
endTest();
- if (bw != null)
- {
- bw.shutdown();
- }
- if (br != null)
- {
- br.shutdown();
- }
+ shutdown(bw);
+ shutdown(br);
}
}
@@ -750,7 +691,6 @@
*/
private class BrokerInitializer
{
-
private ReplicationBroker rb;
private int serverId = -1;
private long userId;
@@ -769,9 +709,7 @@
*/
private BrokerReader reader;
- /**
- * Creates a broker initializer. Also creates a reader according to request
- */
+ /** Creates a broker initializer. Also creates a reader according to request */
public BrokerInitializer(ReplicationBroker rb, int serverId,
boolean createReader)
{
@@ -780,9 +718,7 @@
this.createReader = createReader;
}
- /**
- * Initializes a full update session by sending InitializeTargetMsg.
- */
+ /** Initializes a full update session by sending InitializeTargetMsg. */
public void initFullUpdate(int destId, long nEntries)
{
// Also create reader ?
@@ -860,19 +796,16 @@
if (createReader)
{
- reader.shutdown();
+ shutdown(reader);
}
debugInfo("Broker " + serverId + " initializer thread is dying");
}
}
- /**
- * Thread for sending a lot of changes through a broker.
- */
+ /** Thread for sending a lot of changes through a broker. */
private class BrokerWriter extends Thread
{
-
private ReplicationBroker rb;
private int serverId = -1;
private long userId;
@@ -993,9 +926,7 @@
debugInfo("Broker " + serverId + " writer thread is dying");
}
- /**
- * Stops the writer thread.
- */
+ /** Stops the writer thread. */
public void shutdown()
{
suspended.set(true); // If were working
@@ -1013,15 +944,10 @@
}
// Stop reader if any
- if (reader != null)
- {
- reader.shutdown();
- }
+ StateMachineTest.shutdown(reader);
}
- /**
- * Suspends the writer thread.
- */
+ /** Suspends the writer thread. */
public void pause()
{
if (isPaused())
@@ -1036,17 +962,13 @@
}
}
- /**
- * Test if the writer is suspended.
- */
+ /** Test if the writer is suspended. */
public boolean isPaused()
{
return sessionDone.get();
}
- /**
- * Resumes the writer thread until it is paused.
- */
+ /** Resumes the writer thread until it is paused. */
public void follow()
{
sessionDone.set(false);
@@ -1131,7 +1053,6 @@
*/
private class BrokerReader extends Thread
{
-
private ReplicationBroker rb;
private int serverId = -1;
private boolean shutdown;
@@ -1171,9 +1092,7 @@
debugInfo("Broker " + serverId + " reader thread is dying");
}
- /**
- * Returns last received message from reader When read, last value is cleared.
- */
+ /** Returns last received message from reader When read, last value is cleared. */
public ReplicationMsg getLastMsg()
{
ReplicationMsg toReturn = lastMsg;
@@ -1197,51 +1116,27 @@
}
/**
- * Waits for a long time for an equality condition to be true.
- * Every second, the equality check is performed. After the provided amount of
- * seconds, if the equality is false, an assertion error is raised.
- * This methods ends either because the equality is true or if the timeout
- * occurs after the provided number of seconds.
- * This method is convenient when the the equality can only occur after a
- * period of time which is difficult to establish, but we know it will occur
- * anyway. This has 2 advantages compared to a classical code like this:
- * - sleep(some time);
- * - assertEquals(testedValue, expectedValue);
- * 1. If the sleep value is too big, this will impact the total time of
- * running tests uselessly. It may also penalize a fast running machine where
- * the sleep time value may be unnecessarily to long.
- * 2. If the sleep value is too small, some slow machines may have the test
- * fail whereas some additional time would have made the test succeed.
- * @param secTimeout Number of seconds to wait before failing. The value for
- * this should be high. A timeout is needed anyway to have the test campaign
- * finish anyway.
- * @param testedValue The value we want to test
- * @param expectedValue The value the tested value should be equal to
+ * Waits until the domain status reaches the expected status.
+ * @param domain The domain whose status we want to test
+ * @param expectedStatus The expected domain status
*/
- private void sleepAssertStatusEquals(int secTimeout, LDAPReplicationDomain testedValue,
- ServerStatus expectedValue) throws Exception
+ private void waitUntilStatusEquals(final LDAPReplicationDomain domain, final ServerStatus expectedStatus) throws Exception
{
- assertTrue(testedValue != null && expectedValue != null, "sleepAssertStatusEquals: null parameters");
+ assertNotNull(domain);
+ assertNotNull(expectedStatus);
- // Go out of the loop only if equality is obtained or if timeout occurs
- int nSec = 0;
- while (true)
+ TestTimer timer = new TestTimer.Builder()
+ .maxSleep(30, SECONDS)
+ .sleepTimes(500, MILLISECONDS)
+ .toTimer();
+ timer.repeatUntilSuccess(new Callable<Void>()
{
- Thread.sleep(1000);
- nSec++;
-
- // Test equality of values
- if (testedValue.getStatus().equals(expectedValue))
+ @Override
+ public Void call() throws Exception
{
- debugInfo("sleepAssertStatusEquals: equality obtained after "
- + nSec + " seconds (" + expectedValue + ").");
- return;
+ assertEquals(domain.getStatus(), expectedStatus);
+ return null;
}
-
- // Timeout reached, end with error
- assertTrue(nSec < secTimeout, "sleepAssertStatusEquals: got <"
- + testedValue.getStatus() + "> where expected <" + expectedValue
- + ">");
- }
+ });
}
}
--
Gitblit v1.10.0