| | |
| | | ReplicationBroker server = null; |
| | | BrokerReader reader = null; |
| | | int TOTAL_MSG = 1000; // number of messages to send during the test |
| | | int CLIENT_THREADS = 2; // number of threads that will try to read |
| | | int CLIENT_THREADS = 3; // number of threads that will try to read |
| | | // the messages |
| | | ChangeNumberGenerator gen = |
| | | new ChangeNumberGenerator((short)5 , (long) 0); |
| | |
| | | */ |
| | | server = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), (short) 5, 100, replicationServerPort, |
| | | 1000, 1000, 0, true); |
| | | 100000, 1000, 0, false); |
| | | |
| | | assertTrue(server.isConnected()); |
| | | |
| | | reader = new BrokerReader(server); |
| | | reader = new BrokerReader(server, TOTAL_MSG); |
| | | |
| | | /* |
| | | * Start the client threads. |
| | |
| | | DN.decode(TEST_ROOT_DN_STRING), (short) (100+i), 100, replicationServerPort, |
| | | 1000, true); |
| | | assertTrue(clientBroker[i].isConnected()); |
| | | client[i] = new BrokerReader(clientBroker[i]); |
| | | client[i] = new BrokerReader(clientBroker[i], TOTAL_MSG); |
| | | } |
| | | |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | |
| | | finally |
| | | { |
| | | if (reader != null) |
| | | reader.join(); |
| | | { |
| | | reader.join(10000); |
| | | } |
| | | if (server != null) |
| | | { |
| | | server.stop(); |
| | | } |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | | { |
| | | if (client[i] != null) |
| | | client[i].join(); |
| | | { |
| | | client[i].join(10000); |
| | | client[i].interrupt(); |
| | | } |
| | | |
| | | } |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | | { |
| | |
| | | new ChangeNumberGenerator(serverId , (long) 0); |
| | | broker[i] = |
| | | openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), serverId, |
| | | 100, replicationServerPort, 1000, 1000, 0, true); |
| | | 100, replicationServerPort, 100000, 1000, 0, false); |
| | | |
| | | assertTrue(broker[i].isConnected()); |
| | | |
| | | producer[i] = new BrokerWriter(broker[i], gen, TOTAL_MSG/THREADS); |
| | | reader[i] = new BrokerReader(broker[i]); |
| | | reader[i] = new BrokerReader(broker[i], (TOTAL_MSG/THREADS)*(THREADS-1)); |
| | | } |
| | | |
| | | for (int i =0; i< THREADS; i++) |
| | |
| | | for (int i = 0; i< THREADS; i++) |
| | | { |
| | | if (producer[i] != null) |
| | | producer[i].join(); |
| | | producer[i].join(10000); |
| | | // kill the thread in case it is not yet stopped. |
| | | producer[i].interrupt(); |
| | | } |
| | | for (int i = 0; i< THREADS; i++) |
| | | { |
| | | if (reader[i] != null) |
| | | reader[i].join(); |
| | | reader[i].join(10000); |
| | | // kill the thread in case it is not yet stopped. |
| | | reader[i].interrupt(); |
| | | } |
| | | for (int i = 0; i< THREADS; i++) |
| | | { |
| | |
| | | private class BrokerReader extends Thread |
| | | { |
| | | private ReplicationBroker broker; |
| | | private int numMsgRcv = 0; |
| | | private final int numMsgExpected; |
| | | |
| | | /** |
| | | * Creates a new Stress Test Reader |
| | | * @param broker |
| | | */ |
| | | public BrokerReader(ReplicationBroker broker) |
| | | public BrokerReader(ReplicationBroker broker, int numMsgExpected) |
| | | { |
| | | this.broker = broker; |
| | | this.numMsgExpected = numMsgExpected; |
| | | } |
| | | |
| | | /** |
| | |
| | | while (true) |
| | | { |
| | | ReplicationMsg msg = broker.receive(); |
| | | broker.updateWindowAfterReplay(); |
| | | if (msg == null) |
| | | if (msg instanceof UpdateMsg) |
| | | { |
| | | numMsgRcv++; |
| | | broker.updateWindowAfterReplay(); |
| | | } |
| | | if ((msg == null) || (numMsgRcv >= numMsgExpected)) |
| | | break; |
| | | } |
| | | } catch (Exception e) { |
| | | } catch (SocketTimeoutException e) |
| | | { |
| | | assertTrue((numMsgRcv == numMsgExpected), |
| | | "a BrokerReader did not received the expected message number :" |
| | | + numMsgRcv + " " + numMsgExpected); |
| | | } catch (Exception e) |
| | | { |
| | | assertTrue(false, |
| | | "a BrokerReader received an Exception" + e.getMessage()); |
| | | } |
| | | } |
| | | } |
| | |
| | | private Entry createExportAllTask() |
| | | throws Exception |
| | | { |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | | String path = "exportLDIF.ldif"; |
| | | return TestCaseUtils.makeEntry( |
| | | "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", |
| | |
| | | throws Exception |
| | | { |
| | | String root = suffix.substring(suffix.indexOf('=')+1, suffix.indexOf(',')); |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | | String path = "exportLDIF" + root +".ldif"; |
| | | return TestCaseUtils.makeEntry( |
| | | "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", |