| | |
| | | /** |
| | | * A set of counters used for Monitoring. |
| | | */ |
| | | private AtomicInteger numProcessedUpdates = new AtomicInteger(); |
| | | private AtomicInteger numProcessedUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numRcvdUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numSentUpdates = new AtomicInteger(0); |
| | | |
| | |
| | | private Map<Short,Integer> assuredSrServerNotAcknowledgedUpdates = |
| | | new HashMap<Short,Integer>(); |
| | | // Number of updates received in Assured Mode, Safe Read request |
| | | private AtomicInteger receivedAssuredSrUpdates = new AtomicInteger(0); |
| | | private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0); |
| | | // Number of updates received in Assured Mode, Safe Read request that we have |
| | | // acked without errors |
| | | private AtomicInteger receivedAssuredSrUpdatesAcked = new AtomicInteger(0); |
| | | private AtomicInteger assuredSrReceivedUpdatesAcked = new AtomicInteger(0); |
| | | // Number of updates received in Assured Mode, Safe Read request that we have |
| | | // acked with errors |
| | | private AtomicInteger receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0); |
| | | private AtomicInteger assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Data |
| | | private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Data, that have been |
| | |
| | | if ( update.isAssured() && (update.getAssuredMode() == |
| | | AssuredMode.SAFE_READ_MODE) && (rsGroupId == groupId) ) |
| | | { |
| | | receivedAssuredSrUpdates.incrementAndGet(); |
| | | assuredSrReceivedUpdates.incrementAndGet(); |
| | | } |
| | | return update; |
| | | } |
| | |
| | | |
| | | if (newStatus != status) |
| | | { |
| | | // Reset status date |
| | | lastStatusChangeDate = new Date(); |
| | | // Reset monitoring counters if reconnection |
| | | if (newStatus == ServerStatus.NOT_CONNECTED_STATUS) |
| | | resetMonitoringCounters(); |
| | | |
| | | // Store new status |
| | | status = newStatus; |
| | | lastStatusChangeDate = new Date(); |
| | | // Reset assured monitoring counters (they are valid for a session with |
| | | // the same status/RS) |
| | | resetAssuredMonitoringCounters(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + serviceID + |
| | |
| | | * Gets the number of updates received in assured safe read mode request. |
| | | * @return The number of updates received in assured safe read mode request. |
| | | */ |
| | | public int getReceivedAssuredSrUpdates() |
| | | public int getAssuredSrReceivedUpdates() |
| | | { |
| | | return receivedAssuredSrUpdates.get(); |
| | | return assuredSrReceivedUpdates.get(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return The number of updates received in assured safe read mode that we |
| | | * acked without error (no replay error). |
| | | */ |
| | | public int getReceivedAssuredSrUpdatesAcked() |
| | | public int getAssuredSrReceivedUpdatesAcked() |
| | | { |
| | | return this.receivedAssuredSrUpdatesAcked.get(); |
| | | return this.assuredSrReceivedUpdatesAcked.get(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return The number of updates received in assured safe read mode that we |
| | | * did not ack due to error (replay error). |
| | | */ |
| | | public int getReceivedAssuredSrUpdatesNotAcked() |
| | | public int getAssuredSrReceivedUpdatesNotAcked() |
| | | { |
| | | return this.receivedAssuredSrUpdatesNotAcked.get(); |
| | | return this.assuredSrReceivedUpdatesNotAcked.get(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Resets the values of the monitoring counters for assured replication. |
| | | * Resets the values of the monitoring counters. |
| | | */ |
| | | private void resetAssuredMonitoringCounters() |
| | | private void resetMonitoringCounters() |
| | | { |
| | | numProcessedUpdates = new AtomicInteger(0); |
| | | numRcvdUpdates = new AtomicInteger(0); |
| | | numSentUpdates = new AtomicInteger(0); |
| | | |
| | | assuredSrSentUpdates = new AtomicInteger(0); |
| | | assuredSrAcknowledgedUpdates = new AtomicInteger(0); |
| | | assuredSrNotAcknowledgedUpdates = new AtomicInteger(0); |
| | |
| | | assuredSrWrongStatusUpdates = new AtomicInteger(0); |
| | | assuredSrReplayErrorUpdates = new AtomicInteger(0); |
| | | assuredSrServerNotAcknowledgedUpdates = new HashMap<Short,Integer>(); |
| | | receivedAssuredSrUpdates = new AtomicInteger(0); |
| | | receivedAssuredSrUpdatesAcked = new AtomicInteger(0); |
| | | receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0); |
| | | assuredSrReceivedUpdates = new AtomicInteger(0); |
| | | assuredSrReceivedUpdatesAcked = new AtomicInteger(0); |
| | | assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0); |
| | | assuredSdSentUpdates = new AtomicInteger(0); |
| | | assuredSdAcknowledgedUpdates = new AtomicInteger(0); |
| | | assuredSdTimeoutUpdates = new AtomicInteger(0); |
| | |
| | | broker.publish(ackMsg); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | receivedAssuredSrUpdatesNotAcked.incrementAndGet(); |
| | | assuredSrReceivedUpdatesNotAcked.incrementAndGet(); |
| | | } else |
| | | { |
| | | receivedAssuredSrUpdatesAcked.incrementAndGet(); |
| | | assuredSrReceivedUpdatesAcked.incrementAndGet(); |
| | | } |
| | | } |
| | | } else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |