| | |
| | | /** |
| | | * This class implements a buffering/producer/consumer mechanism of |
| | | * replication changes (UpdateMsg) used inside the replication server. |
| | | * It logically represents another RS than the current one, and is used when the |
| | | * other RS is brought online and needs to catch up with the changes in the |
| | | * current RS. |
| | | * |
| | | * MessageHandlers are registered into Replication server domains. |
| | | * When an update message is received by a domain, the domain forwards |
| | |
| | | /* TODO : size should be configurable |
| | | * and larger than max-receive-queue-size |
| | | */ |
| | | while ((msgQueue.count() > maxQueueSize) || |
| | | (msgQueue.bytesCount() > maxQueueBytesSize)) |
| | | while (isMsgQueueAboveThreshold()) |
| | | { |
| | | following = false; |
| | | msgQueue.removeFirst(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private boolean isMsgQueueAboveThreshold() |
| | | { |
| | | return msgQueue.count() > maxQueueSize |
| | | || msgQueue.bytesCount() > maxQueueBytesSize; |
| | | } |
| | | |
| | | private boolean isMsgQueueBelowThreshold() |
| | | { |
| | | return !isMsgQueueAboveThreshold(); |
| | | } |
| | | |
| | | /** |
| | | * Set the shut down flag to true and returns the previous value of the flag. |
| | | * @return The previous value of the shut down flag |
| | | */ |
| | | public boolean engageShutdown() |
| | | { |
| | | // Use thread safe boolean |
| | | return shuttingDown.getAndSet(true); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /* |
| | | * If the late queue is empty then we could not find any |
| | | * messages in the replication log so the remote serevr is not |
| | | * late anymore. |
| | | * If the late queue is empty then we could not find any messages in |
| | | * the replication log so the remote server is not late anymore. |
| | | */ |
| | | if (lateQueue.isEmpty()) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | if ((msgQueue.count() < maxQueueSize) && |
| | | (msgQueue.bytesCount() < maxQueueBytesSize)) |
| | | // Ensure we are below threshold so this server will follow the |
| | | // msgQueue without fearing the msgQueue gets trimmed |
| | | if (isMsgQueueBelowThreshold()) |
| | | { |
| | | following = true; |
| | | } |
| | |
| | | /* we finally catch up with the regular queue */ |
| | | following = true; |
| | | lateQueue.clear(); |
| | | UpdateMsg msg1; |
| | | do |
| | | { |
| | | msg1 = msgQueue.removeFirst(); |
| | | } while (!msg.getCSN().equals(msg1.getCSN())); |
| | | msgQueue.consumeUpTo(msg); |
| | | updateServerState(msg); |
| | | return msg1; |
| | | return msg; |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | } |
| | | /* |
| | | * Need to loop because following flag may have gone to false between |
| | | * the first check at the beginning of this method |
| | | * and the second check just above. |
| | | * Need to loop because following flag may have gone to false between the |
| | | * first check at the beginning of this method and the second check just |
| | | * above. |
| | | */ |
| | | } |
| | | return null; |
| | |
| | | new TreeSet<ReplicaDBCursor>(); |
| | | for (int serverId : replicationServerDomain.getServerIds()) |
| | | { |
| | | // get the last already sent CN from that server to get a cursor |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCsn = serverState.getCSN(serverId); |
| | | addCursorIfNotEmpty(results, |
| | | replicationServerDomain.getCursorFrom(serverId, lastCsn)); |