| | |
| | | private int maxSendQueue = 0; |
| | | private int maxReceiveDelay = 0; |
| | | private int maxSendDelay = 0; |
| | | private int maxQueueSize = 10000; |
| | | private int restartReceiveQueue; |
| | | private int restartSendQueue; |
| | | private int restartReceiveDelay; |
| | |
| | | /** |
| | | * Creates a new server handler instance with the provided socket. |
| | | * |
| | | * @param session The ProtocolSession used by the ServerHandler to |
| | | * @param session The ProtocolSession used by the ServerHandler to |
| | | * communicate with the remote entity. |
| | | * @param queueSize The maximum number of update that will be kept |
| | | * in memory by this ServerHandler. |
| | | */ |
| | | public ServerHandler(ProtocolSession session) |
| | | public ServerHandler(ProtocolSession session, int queueSize) |
| | | { |
| | | super("Server Handler"); |
| | | this.session = session; |
| | | this.maxQueueSize = queueSize; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | /* |
| | | * TODO : When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | | * of the servers. |
| | | * The calculation should be done by asking to the each dbHandler |
| | | * how many changes need to be replicated and making the sum |
| | | * For now just return maxint in this case |
| | | */ |
| | | /* |
| | | * When the server is up to date or close to be up to date, |
| | | * the number of updates to be sent is the size of the receive queue. |
| | | */ |
| | | if (isFollowing()) |
| | | return msgQueue.size(); |
| | | else |
| | | return Integer.MAX_VALUE; |
| | | { |
| | | /* |
| | | * When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | | * of the servers. |
| | | * The total size of teh receieve queue is calculated by doing |
| | | * the sum of the number of missing changes for every dbHandler. |
| | | */ |
| | | int totalCount = 0; |
| | | ServerState dbState = changelogCache.getDbServerState(); |
| | | for (short id : dbState) |
| | | { |
| | | int max = dbState.getMaxChangeNumber(id).getSeqnum(); |
| | | ChangeNumber currentChange = serverState.getMaxChangeNumber(id); |
| | | if (currentChange != null) |
| | | { |
| | | int current = currentChange.getSeqnum(); |
| | | if (current == max) |
| | | { |
| | | } |
| | | else if (current < max) |
| | | { |
| | | totalCount += max - current; |
| | | } |
| | | else |
| | | { |
| | | totalCount += Integer.MAX_VALUE - (current - max) + 1; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | totalCount += max; |
| | | } |
| | | } |
| | | return totalCount; |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | /* TODO : size should be configurable |
| | | * and larger than max-receive-queue-size |
| | | */ |
| | | while (msgQueue.size() > 10000) |
| | | while (msgQueue.size() > maxQueueSize) |
| | | { |
| | | following = false; |
| | | msgQueue.removeFirst(); |
| | |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | if (msgQueue.size() < 10000) |
| | | if (msgQueue.size() < maxQueueSize) |
| | | { |
| | | following = true; |
| | | } |
| | |
| | | baseDn.toString())); |
| | | attributes.add(new Attribute("waiting-changes", |
| | | String.valueOf(getRcvMsgQueueSize()))); |
| | | attributes.add(new Attribute("max-waiting-changes", |
| | | String.valueOf(maxQueueSize))); |
| | | attributes.add(new Attribute("update-waiting-acks", |
| | | String.valueOf(getWaitingAckSize()))); |
| | | attributes.add(new Attribute("update-sent", |