| | |
| | | */ |
| | | protected int inCount = 0; |
| | | /** |
| | | * Specifies the max receive queue for this handler. |
| | | */ |
| | | protected int maxReceiveQueue = 0; |
| | | /** |
| | | * Specifies the max send queue for this handler. |
| | | */ |
| | | protected int maxSendQueue = 0; |
| | | /** |
| | | * Specifies the max receive delay for this handler. |
| | | */ |
| | | protected int maxReceiveDelay = 0; |
| | | /** |
| | | * Specifies the max send delay for this handler. |
| | | */ |
| | | protected int maxSendDelay = 0; |
| | | /** |
| | | * Specifies the max queue size for this handler. |
| | | */ |
| | | protected int maxQueueSize = 5000; |
| | |
| | | */ |
| | | protected int maxQueueBytesSize = maxQueueSize * 100; |
| | | /** |
| | | * Specifies the max restart receive queue for this handler. |
| | | */ |
| | | protected int restartReceiveQueue; |
| | | /** |
| | | * Specifies the max restart send queue for this handler. |
| | | */ |
| | | protected int restartSendQueue; |
| | | /** |
| | | * Specifies the max restart receive delay for this handler. |
| | | */ |
| | | protected int restartReceiveDelay; |
| | | /** |
| | | * Specifies the max restart send delay for this handler. |
| | | */ |
| | | protected int restartSendDelay; |
| | | /** |
| | | * Specifies whether the consumer is following the producer (is not late). |
| | | */ |
| | | protected boolean following = false; |
| | |
| | | */ |
| | | private String serviceId = null; |
| | | /** |
| | | * Specifies whether the server is flow controlled and should be stopped from |
| | | * sending messages. |
| | | */ |
| | | protected boolean flowControl = false; |
| | | /** |
| | | * Specifies whether the consumer is still active or not. |
| | | * If not active, the handler will not return any message. |
| | | * Called at the beginning of shutdown process. |
| | |
| | | msgQueue.removeFirst(); |
| | | } |
| | | } |
| | | |
| | | if (isSaturated(update.getChangeNumber(), sourceHandler)) |
| | | { |
| | | sourceHandler.setSaturated(true); |
| | | } |
| | | |
| | | } |
| | | /** |
| | | * Set the shut down flag to true and returns the previous value of the flag. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check is this server is saturated (this server has already been |
| | | * sent a bunch of updates and has not processed them so they are staying |
| | | * in the message queue for this server an the size of the queue |
| | | * for this server is above the configured limit. |
| | | * |
| | | * The limit can be defined in number of updates or with a maximum delay |
| | | * |
| | | * @param changeNumber The changenumber to use to make the delay calculations. |
| | | * @param sourceHandler The ServerHandler which is sending the update. |
| | | * @return true is saturated false if not saturated. |
| | | */ |
| | | public boolean isSaturated(ChangeNumber changeNumber, |
| | | MessageHandler sourceHandler) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | int size = msgQueue.count(); |
| | | |
| | | if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue)) |
| | | return true; |
| | | |
| | | if ((sourceHandler.maxSendQueue > 0) && |
| | | (size >= sourceHandler.maxSendQueue)) |
| | | return true; |
| | | |
| | | if (!msgQueue.isEmpty()) |
| | | { |
| | | UpdateMsg firstUpdate = msgQueue.first(); |
| | | |
| | | if (firstUpdate != null) |
| | | { |
| | | long timeDiff = changeNumber.getTimeSec() - |
| | | firstUpdate.getChangeNumber().getTimeSec(); |
| | | |
| | | if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay)) |
| | | return true; |
| | | |
| | | if ((sourceHandler.maxSendDelay > 0) && |
| | | (timeDiff >= sourceHandler.maxSendDelay)) |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check that the size of the Server Handler messages Queue has lowered |
| | | * below the limit and therefore allowing the reception of messages |
| | | * from other servers to restart. |
| | | * @param source The ServerHandler which was sending the update. |
| | | * can be null. |
| | | * @return true if the processing can restart |
| | | */ |
| | | public boolean restartAfterSaturation(MessageHandler source) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | int queueSize = msgQueue.count(); |
| | | if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue)) |
| | | return false; |
| | | if ((source != null) && (source.maxSendQueue > 0) && |
| | | (queueSize >= source.restartSendQueue)) |
| | | return false; |
| | | |
| | | if (!msgQueue.isEmpty()) |
| | | { |
| | | UpdateMsg firstUpdate = msgQueue.first(); |
| | | UpdateMsg lastUpdate = msgQueue.last(); |
| | | |
| | | if ((firstUpdate != null) && (lastUpdate != null)) |
| | | { |
| | | long timeDiff = lastUpdate.getChangeNumber().getTimeSec() - |
| | | firstUpdate.getChangeNumber().getTimeSec(); |
| | | if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay)) |
| | | return false; |
| | | if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >= |
| | | source.restartSendDelay)) |
| | | return false; |
| | | } |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Set that the consumer is now becoming inactive and thus getNextMessage |
| | | * should not return any UpdateMsg any more. |
| | | * @param active the provided state of the consumer. |
| | |
| | | this.following = following; |
| | | } |
| | | |
| | | private void setSaturated(boolean value) |
| | | { |
| | | flowControl = value; |
| | | } |
| | | |
| | | /** |
| | | * Set the initial value of the serverState for this handler. |
| | | * Expected to be done once, then the state will be updated using |