| | |
| | | private int maxSendQueue = 0; |
| | | private int maxReceiveDelay = 0; |
| | | private int maxSendDelay = 0; |
| | | private int maxQueueSize = 10000; |
| | | private int maxQueueSize = 5000; |
| | | private int maxQueueBytesSize = maxQueueSize * 100; |
| | | private int restartReceiveQueue; |
| | | private int restartSendQueue; |
| | | private int restartReceiveDelay; |
| | |
| | | private Semaphore sendWindow; |
| | | private int sendWindowSize; |
| | | private boolean flowControl = false; // indicate that the server is |
| | | // flow controled and should |
| | | // be stopped from sending messsages. |
| | | // flow controlled and should |
| | | // be stopped from sending messages. |
| | | private int saturationCount = 0; |
| | | private short replicationServerId; |
| | | |
| | |
| | | super("Server Handler"); |
| | | this.session = session; |
| | | this.maxQueueSize = queueSize; |
| | | this.maxQueueBytesSize = queueSize * 100; |
| | | this.protocolVersion = ProtocolVersion.currentVersion(); |
| | | } |
| | | |
| | |
| | | session.publish(myStartMsg); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | |
| | | /* Until here session is encrypted then it depends on the negociation */ |
| | | /* Until here session is encrypted then it depends on the negotiation */ |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | |
| | | } |
| | | else |
| | | { |
| | | // We are an empty Replicationserver |
| | | // We are an empty Replication Server |
| | | if ((generationId>0)&&(!serverState.isEmpty())) |
| | | { |
| | | // If the LDAP server has already sent changes |
| | |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | int size = msgQueue.size(); |
| | | int size = msgQueue.count(); |
| | | |
| | | if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue)) |
| | | return true; |
| | |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | int queueSize = msgQueue.size(); |
| | | int queueSize = msgQueue.count(); |
| | | if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue)) |
| | | return false; |
| | | if ((source != null) && (source.maxSendQueue > 0) && |
| | |
| | | * the number of updates to be sent is the size of the receive queue. |
| | | */ |
| | | if (isFollowing()) |
| | | return msgQueue.size(); |
| | | return msgQueue.count(); |
| | | else |
| | | { |
| | | /* |
| | |
| | | /* TODO : size should be configurable |
| | | * and larger than max-receive-queue-size |
| | | */ |
| | | while (msgQueue.size() > maxQueueSize) |
| | | while ((msgQueue.count() > maxQueueSize) || |
| | | (msgQueue.bytesCount() > maxQueueBytesSize)) |
| | | { |
| | | setFollowing(false); |
| | | msgQueue.removeFirst(); |
| | |
| | | |
| | | // The loop below relies on the fact that it is sorted based |
| | | // on the currentChange of each iterator to consider the next |
| | | // change accross all servers. |
| | | // change across all servers. |
| | | // Hence it is necessary to remove and eventual add again an iterator |
| | | // when looping in order to keep consistent the order of the |
| | | // iterators (see ReplicationIteratorComparator. |
| | | while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100)) |
| | | while (!iteratorSortedSet.isEmpty() && |
| | | (lateQueue.count()<100) && |
| | | (lateQueue.bytesCount()<50000) ) |
| | | { |
| | | ReplicationIterator iterator = iteratorSortedSet.first(); |
| | | iteratorSortedSet.remove(iterator); |
| | |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | if (msgQueue.size() < maxQueueSize) |
| | | if ((msgQueue.count() < maxQueueSize) && |
| | | (msgQueue.bytesCount() < maxQueueBytesSize)) |
| | | { |
| | | setFollowing(true); |
| | | } |
| | |
| | | { |
| | | if (msgQueue.contains(msg)) |
| | | { |
| | | /* we finally catched up with the regular queue */ |
| | | /* we finally catch up with the regular queue */ |
| | | setFollowing(true); |
| | | lateQueue.clear(); |
| | | UpdateMessage msg1; |
| | |
| | | * Update the serverState with the last message sent. |
| | | * |
| | | * @param msg the last update sent. |
| | | * @return boolean indicating if the update was meaningfull. |
| | | * @return boolean indicating if the update was meaningful. |
| | | */ |
| | | public boolean updateServerState(UpdateMessage msg) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | attributes.add( |
| | | new Attribute("queue-size", String.valueOf(msgQueue.count()))); |
| | | attributes.add( |
| | | new Attribute( |
| | | "queue-size-bytes", String.valueOf(msgQueue.bytesCount()))); |
| | | attributes.add( |
| | | new Attribute( |
| | | "following", String.valueOf(following))); |
| | | |
| | | // Deprecated |
| | | attributes.add(new Attribute("max-waiting-changes", |
| | | String.valueOf(maxQueueSize))); |