| | |
| | | import java.io.IOException; |
| | | import java.util.List; |
| | | import java.util.Random; |
| | | import java.util.Set; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe data |
| | | * mode. |
| | | */ |
| | | private void incrementAssuredSdSentUpdates() |
| | | { |
| | | assuredSdSentUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe data |
| | | * mode that timed out. |
| | | */ |
| | | public void incrementAssuredSdSentUpdatesTimeout() |
| | |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe read |
| | | * mode. |
| | | */ |
| | | private void incrementAssuredSrSentUpdates() |
| | | { |
| | | assuredSrSentUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe read |
| | | * mode that timed out. |
| | | */ |
| | | public void incrementAssuredSrSentUpdatesTimeout() |
| | |
| | | * Select the next update that must be sent to the server managed by this |
| | | * ServerHandler. |
| | | * |
| | | * @param connectedReplicaIds |
| | | * Ids of replicas to accept when returning a message. |
| | | * @return the next update that must be sent to the server managed by this |
| | | * ServerHandler. |
| | | */ |
| | | public UpdateMsg take(Set<Integer> connectedReplicaIds) |
| | | public UpdateMsg take() |
| | | { |
| | | boolean interrupted = true; |
| | | UpdateMsg msg = getNextMessage(connectedReplicaIds, true); // synchronous:block until msg |
| | | final UpdateMsg msg = getNextMessage(serverId); |
| | | |
| | | acquirePermitInSendWindow(); |
| | | |
| | | if (msg != null) |
| | | { |
| | | incrementOutCount(); |
| | | if (msg.isAssured()) |
| | | { |
| | | incrementAssuredStats(msg); |
| | | } |
| | | return msg; |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | private void acquirePermitInSendWindow() |
| | | { |
| | | boolean acquired = false; |
| | | boolean interrupted = true; |
| | | do |
| | | { |
| | | try |
| | |
| | | // loop until not interrupted |
| | | } |
| | | } while ((interrupted || !acquired) && !shutdownWriter); |
| | | if (msg != null) |
| | | { |
| | | incrementOutCount(); |
| | | } |
| | | |
| | | if (msg.isAssured()) |
| | | { |
| | | if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | incrementAssuredSrSentUpdates(); |
| | | } else if (!isDataServer()) |
| | | { |
| | | incrementAssuredSdSentUpdates(); |
| | | } |
| | | } |
| | | private void incrementAssuredStats(final UpdateMsg msg) |
| | | { |
| | | if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | assuredSrSentUpdates++; |
| | | } |
| | | return msg; |
| | | else if (!isDataServer()) |
| | | { |
| | | assuredSdSentUpdates++; |
| | | } |
| | | } |
| | | |
| | | /** |