| | |
| | | for (BrokerReader r : reader) |
| | | { |
| | | if (r != null) |
| | | { |
| | | assertNull(r.errDetails, r.exc + " " + r.errDetails); |
| | | } |
| | | } |
| | | } |
| | | debugInfo("Ending multipleWriterMultipleReader"); |
| | |
| | | { |
| | | ReplicationMsg msg = broker2.receive(); |
| | | if (msg == null) |
| | | { |
| | | break; |
| | | } |
| | | if (msg instanceof TopologyMsg) |
| | | { |
| | | continue; // ignore |
| | | } |
| | | msgs.add(msg); |
| | | |
| | | broker2.updateWindowAfterReplay(); |
| | |
| | | // may prevent to process a WindowMsg that would unblock the dual |
| | | // writer thread. |
| | | if (msg == null) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | } catch (SocketTimeoutException e) |
| | | { |
| | |
| | | broker.publish(msg); |
| | | |
| | | if ((count % 10) == 0) |
| | | debugInfo("writer " + broker.getServerId() + " to send="+count); |
| | | { |
| | | debugInfo("writer " + broker.getServerId() + " to send="+count); |
| | | } |
| | | } |
| | | debugInfo("writer " + broker.getServerId() + " ends sent="+ccount); |
| | | } |
| | |
| | | // Connect only replicationServer[0] to ReplicationServer[1] |
| | | // and not the other way |
| | | if (i==0) |
| | | { |
| | | servers.add("localhost:" + changelogPorts[1]); |
| | | } |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(changelogPorts[i], "replicationServerTestReplicationServerConnectedDb"+i, replicationDbImplementation, |
| | | 0, changelogIds[i], 0, 100, servers); |