| | |
| | | reader[i].start(); |
| | | } |
| | | debugInfo("multipleWriterMultipleReader produces and readers started"); |
| | | Thread.sleep(2000); |
| | | } |
| | | finally |
| | | { |
| | | debugInfo("multipleWriterMultipleReader wait producers"); |
| | | debugInfo("multipleWriterMultipleReader wait producers end"); |
| | | for (int i = 0; i< THREADS; i++) |
| | | { |
| | | if (producer[i] != null) |
| | | { |
| | | producer[i].join(10000); |
| | | // kill the thread in case it is not yet stopped. |
| | | producer[i].interrupt(); |
| | | // kill the thread in case it is not yet stopped. |
| | | producer[i].interrupt(); |
| | | } |
| | | } |
| | | debugInfo("multipleWriterMultipleReader producers done, wait readers"); |
| | | debugInfo("multipleWriterMultipleReader producers ended, now wait readers end"); |
| | | for (int i = 0; i< THREADS; i++) |
| | | { |
| | | if (reader[i] != null) |
| | | reader[i].join(10000); |
| | | // kill the thread in case it is not yet stopped. |
| | | assertTrue(reader[i].exc==null, |
| | | reader[i].exc + " " + reader[i].errDetails); |
| | | reader[i].interrupt(); |
| | | } |
| | | debugInfo("multipleWriterMultipleReader reader's done"); |
| | | debugInfo("multipleWriterMultipleReader reader's ended, now stop brokers"); |
| | | for (int i = 0; i< THREADS; i++) |
| | | { |
| | | if (broker[i] != null) |
| | |
| | | private ReplicationBroker broker; |
| | | private int numMsgRcv = 0; |
| | | private final int numMsgExpected; |
| | | public Exception exc; |
| | | public String errDetails; |
| | | |
| | | /** |
| | | * Creates a new Stress Test Reader |
| | |
| | | } |
| | | if ((msg == null) || (numMsgRcv >= numMsgExpected)) |
| | | break; |
| | | } |
| | | } |
| | | } catch (SocketTimeoutException e) |
| | | { |
| | | assertTrue((numMsgRcv == numMsgExpected), |
| | | "a BrokerReader did not received the expected message number :" |
| | | + numMsgRcv + " " + numMsgExpected); |
| | | if (numMsgRcv != numMsgExpected) |
| | | { |
| | | this.exc = e; |
| | | this.errDetails = |
| | | "BrokerReader " + broker.getServerId() |
| | | + " did not received the expected message number : act=" |
| | | + numMsgRcv + " exp=" + numMsgExpected; |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | assertTrue(false, |
| | | this.exc = e; |
| | | this.errDetails = |
| | | "a BrokerReader received an Exception" + e.getMessage() |
| | | + stackTraceToSingleLineString(e)); |
| | | + stackTraceToSingleLineString(e); |
| | | } |
| | | } |
| | | } |
| | |
| | | @Override |
| | | public void run() |
| | | { |
| | | debugInfo("BrokerWriter " + broker.getServerId() + " starts"); |
| | | debugInfo("writer " + broker.getServerId() + " starts to produce " + count); |
| | | int ccount = count; |
| | | /* |
| | | * Simple loop creating changes and sending them |
| | |
| | | broker.publish(msg); |
| | | |
| | | if ((count % 10) == 0) |
| | | debugInfo("BrokerWriter " + broker.getServerId() + " sent="+count); |
| | | debugInfo("writer " + broker.getServerId() + " to send="+count); |
| | | } |
| | | debugInfo("BrokerWriter " + broker.getServerId() + " ends sent="+ccount); |
| | | debugInfo("writer " + broker.getServerId() + " ends sent="+ccount); |
| | | } |
| | | } |
| | | |