| | |
| | | private ServerReader reader; |
| | | private Semaphore sendWindow; |
| | | private int sendWindowSize; |
| | | private boolean flowControl = false; // indicate that the server is |
| | | // flow controled and should |
| | | // be stopped from sending messsages. |
| | | private int saturationCount = 0; |
| | | |
| | | private static Map<ChangeNumber, ChangelogAckMessageList> |
| | | changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>(); |
| | |
| | | * managed by this ServerHandler. |
| | | * |
| | | * @param update The update that must be added to the list of updates. |
| | | * @param sourceHandler The server that sent the update. |
| | | */ |
| | | public void add(UpdateMessage update) |
| | | public void add(UpdateMessage update, ServerHandler sourceHandler) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | |
| | | msgQueue.removeFirst(); |
| | | } |
| | | } |
| | | |
| | | if (isSaturated(update.getChangeNumber(), sourceHandler)) |
| | | { |
| | | sourceHandler.setSaturated(true); |
| | | } |
| | | |
| | | } |
| | | |
| | | private void setSaturated(boolean value) |
| | | { |
| | | flowControl = value; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | boolean interrupted = true; |
| | | UpdateMessage msg = getnextMessage(); |
| | | |
| | | /* |
| | | * When we remove a message from the queue we need to check if another |
| | | * server is waiting in flow control because this queue was too long. |
| | | * This check might cause a performance penalty an therefore it |
| | | * is not done for every message removed but only every few messages. |
| | | */ |
| | | if (++saturationCount > 10) |
| | | { |
| | | saturationCount = 0; |
| | | try |
| | | { |
| | | changelogCache.checkAllSaturation(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | } |
| | | } |
| | | do { |
| | | try |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Decrement the protocol window, then check if it is necessary |
| | | * to send a WindowMessage and send it. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | */ |
| | | public synchronized void decAndCheckWindow() throws IOException |
| | | { |
| | | rcvWindow--; |
| | | checkWindow(); |
| | | } |
| | | |
| | | /** |
| | | * Check the protocol window and send WindowMessage if necessary. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | */ |
| | | public synchronized void checkWindow() throws IOException |
| | | { |
| | | rcvWindow--; |
| | | if (rcvWindow < rcvWindowSizeHalf) |
| | | { |
| | | WindowMessage msg = new WindowMessage(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | outAckCount++; |
| | | rcvWindow += rcvWindowSizeHalf; |
| | | if (flowControl) |
| | | { |
| | | if (changelogCache.restartAfterSaturation(this)) |
| | | { |
| | | flowControl = false; |
| | | } |
| | | } |
| | | if (!flowControl) |
| | | { |
| | | WindowMessage msg = new WindowMessage(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | outAckCount++; |
| | | rcvWindow += rcvWindowSizeHalf; |
| | | } |
| | | } |
| | | } |
| | | |