| | |
| | | // Safe Data level (used when assuredMode is SAFE_DATA) |
| | | private byte assuredSdLevel = (byte)1; |
| | | // The timeout in ms that should be used, when waiting for assured acks |
| | | private long assuredTimeout = 1000; |
| | | private long assuredTimeout = 2000; |
| | | |
| | | // Group id |
| | | private byte groupId = (byte)1; |
| | |
| | | // format: <server id>:<number of failed updates> |
| | | private Map<Short,Integer> assuredSrServerNotAcknowledgedUpdates = |
| | | new HashMap<Short,Integer>(); |
| | | // Number of updates received in Assured Mode, Safe Read request |
| | | private AtomicInteger receivedAssuredSrUpdates = new AtomicInteger(0); |
| | | // Number of updates received in Assured Mode, Safe Read request that we have |
| | | // acked without errors |
| | | private AtomicInteger receivedAssuredSrUpdatesAcked = new AtomicInteger(0); |
| | | // Number of updates received in Assured Mode, Safe Read request that we have |
| | | // acked with errors |
| | | private AtomicInteger receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Data |
| | | private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Data, that have been |
| | |
| | | } |
| | | |
| | | numRcvdUpdates.incrementAndGet(); |
| | | return update; |
| | | } |
| | | |
| | | /** |
| | | * Wait for the processing of an assured message. |
| | | * |
| | | * @param msg The UpdateMsg for which we are waiting for an ack. |
| | | * @throws TimeoutException When the configured timeout occurs waiting for the |
| | | * ack. |
| | | */ |
| | | private void waitForAck(UpdateMsg msg) throws TimeoutException |
| | | { |
| | | // Wait for the ack to be received, timing out if necessary |
| | | long startTime = System.currentTimeMillis(); |
| | | synchronized (msg) |
| | | if (update.isAssured() && (update.getAssuredMode() == |
| | | AssuredMode.SAFE_READ_MODE)) |
| | | { |
| | | ChangeNumber cn = msg.getChangeNumber(); |
| | | while (waitingAckMsgs.containsKey(cn)) |
| | | { |
| | | try |
| | | { |
| | | // WARNING: this timeout may be difficult to optimize: too low, it |
| | | // may use too much CPU, too high, it may penalize performance... |
| | | msg.wait(10); |
| | | } catch (InterruptedException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("waitForAck method interrupted for replication " + |
| | | "serviceID: " + serviceID); |
| | | } |
| | | break; |
| | | } |
| | | // Timeout ? |
| | | if ( (System.currentTimeMillis() - startTime) >= assuredTimeout ) |
| | | { |
| | | // Timeout occured, be sure that ack is not being received and if so, |
| | | // remove the update from the wait list, log the timeout error and |
| | | // also update assured monitoring counters |
| | | UpdateMsg update; |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | update = waitingAckMsgs.remove(cn); |
| | | } |
| | | |
| | | if (update != null) |
| | | { |
| | | // No luck, this is a real timeout |
| | | // Increment assured replication monitoring counters |
| | | switch (msg.getAssuredMode()) |
| | | { |
| | | case SAFE_READ_MODE: |
| | | assuredSrNotAcknowledgedUpdates.incrementAndGet(); |
| | | assuredSrTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer( |
| | | assuredSrServerNotAcknowledgedUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | case SAFE_DATA_MODE: |
| | | assuredSdTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | default: |
| | | // Should not happen |
| | | } |
| | | |
| | | throw new TimeoutException("No ack received for message cn: " + cn + |
| | | " and replication servceID: " + serviceID + " after " + |
| | | assuredTimeout + " ms."); |
| | | } else |
| | | { |
| | | // Ack received just before timeout limit: we can exit |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | receivedAssuredSrUpdates.incrementAndGet(); |
| | | } |
| | | return update; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public Map<Short, Integer> getAssuredSrServerNotAcknowledgedUpdates() |
| | | { |
| | | return assuredSrServerNotAcknowledgedUpdates; |
| | | // Clone a snapshot with synchronized section to have a consistent view in |
| | | // monitoring |
| | | Map<Short, Integer> snapshot = new HashMap<Short, Integer>(); |
| | | synchronized(assuredSrServerNotAcknowledgedUpdates) |
| | | { |
| | | Set<Short> keySet = assuredSrServerNotAcknowledgedUpdates.keySet(); |
| | | for (Short serverId : keySet) |
| | | { |
| | | Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId); |
| | | snapshot.put(serverId, i); |
| | | } |
| | | } |
| | | return snapshot; |
| | | } |
| | | |
| | | /** |
| | | * Gets the number of updates received in assured safe read mode request. |
| | | * @return The number of updates received in assured safe read mode request. |
| | | */ |
| | | public int getReceivedAssuredSrUpdates() |
| | | { |
| | | return receivedAssuredSrUpdates.get(); |
| | | } |
| | | |
| | | /** |
| | | * Gets the number of updates received in assured safe read mode that we acked |
| | | * without error (no replay error). |
| | | * @return The number of updates received in assured safe read mode that we |
| | | * acked without error (no replay error). |
| | | */ |
| | | public int getReceivedAssuredSrUpdatesAcked() |
| | | { |
| | | return this.receivedAssuredSrUpdatesAcked.get(); |
| | | } |
| | | |
| | | /** |
| | | * Gets the number of updates received in assured safe read mode that we did |
| | | * not ack due to error (replay error). |
| | | * @return The number of updates received in assured safe read mode that we |
| | | * did not ack due to error (replay error). |
| | | */ |
| | | public int getReceivedAssuredSrUpdatesNotAcked() |
| | | { |
| | | return this.receivedAssuredSrUpdatesNotAcked.get(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public Map<Short, Integer> getAssuredSdServerTimeoutUpdates() |
| | | { |
| | | return assuredSdServerTimeoutUpdates; |
| | | // Clone a snapshot with synchronized section to have a consistent view in |
| | | // monitoring |
| | | Map<Short, Integer> snapshot = new HashMap<Short, Integer>(); |
| | | synchronized(assuredSdServerTimeoutUpdates) |
| | | { |
| | | Set<Short> keySet = assuredSdServerTimeoutUpdates.keySet(); |
| | | for (Short serverId : keySet) |
| | | { |
| | | Integer i = assuredSdServerTimeoutUpdates.get(serverId); |
| | | snapshot.put(serverId, i); |
| | | } |
| | | } |
| | | return snapshot; |
| | | } |
| | | |
| | | /** |
| | |
| | | assuredSrWrongStatusUpdates = new AtomicInteger(0); |
| | | assuredSrReplayErrorUpdates = new AtomicInteger(0); |
| | | assuredSrServerNotAcknowledgedUpdates = new HashMap<Short,Integer>(); |
| | | receivedAssuredSrUpdates = new AtomicInteger(0); |
| | | receivedAssuredSrUpdatesAcked = new AtomicInteger(0); |
| | | receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0); |
| | | assuredSdSentUpdates = new AtomicInteger(0); |
| | | assuredSdAcknowledgedUpdates = new AtomicInteger(0); |
| | | assuredSdTimeoutUpdates = new AtomicInteger(0); |
| | |
| | | * @param windowSize The window size that this domain should use. |
| | | * @param heartbeatInterval The heartbeatInterval that this domain should |
| | | * use. |
| | | * @param groupId The new group id to use |
| | | */ |
| | | public void changeConfig( |
| | | Collection<String> replicationServers, |
| | | int windowSize, |
| | | long heartbeatInterval) |
| | | long heartbeatInterval, |
| | | byte groupId) |
| | | { |
| | | this.groupId = groupId; |
| | | |
| | | if (broker != null) |
| | | { |
| | | if (broker.changeConfig( |
| | | replicationServers, windowSize, heartbeatInterval)) |
| | | replicationServers, windowSize, heartbeatInterval, groupId)) |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | |
| | | ackMsg.setFailedServers(idList); |
| | | } |
| | | broker.publish(ackMsg); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | receivedAssuredSrUpdatesNotAcked.incrementAndGet(); |
| | | } else |
| | | { |
| | | receivedAssuredSrUpdatesAcked.incrementAndGet(); |
| | | } |
| | | } |
| | | } else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Publish an {@link UpdateMsg} to the Replication Service. |
| | | * <p> |
| | | * The Replication Service will handle the delivery of this {@link UpdateMsg} |
| | | * to all the participants of this Replication Domain. |
| | | * These members will be receive this {@link UpdateMsg} through a call |
| | | * of the {@link #processUpdate(UpdateMsg)} message. |
| | | * Prepare a message if it is to be sent in assured mode. |
| | | * If the assured mode is enabled, this method should be called before |
| | | * publish(UpdateMsg msg) method. This will configure the update accordingly |
| | | * before it is sent and will prepare the mechanism that will block until the |
| | | * matching ack is received. To wait for the ack after publish call, use |
| | | * the waitForAckIfAssuredEnabled() method. |
| | | * The expected typical usage in a service inheriting from this class is |
| | | * the following sequence: |
| | | * UpdateMsg msg = xxx; |
| | | * prepareWaitForAckIfAssuredEnabled(msg); |
| | | * publish(msg); |
| | | * waitForAckIfAssuredEnabled(msg); |
| | | * |
| | | * @param msg The UpdateMsg that should be pushed. |
| | | * @throws TimeoutException When assured replication is enabled and the |
| | | * configured timeout occurs when blocked waiting for the ack. |
| | | * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have |
| | | * no effect if assured replication is disabled. |
| | | * Note: this mechanism should not be used if using publish(byte[] msg) |
| | | * version as usage of these methods is already hidden inside. |
| | | * |
| | | * @param msg The update message to be sent soon. |
| | | */ |
| | | public void publish(UpdateMsg msg) throws TimeoutException |
| | | protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg) |
| | | { |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | |
| | | // If assured configured, set message accordingly to request an ack in the |
| | | // right assured mode. |
| | | // No ack requested for a RS with a different group id. Assured replication |
| | | // suported for the same locality, i.e: a topology working in the same |
| | | // geographical location). If we are connected to a RS which is not in our |
| | | // locality, no need to ask for an ack. |
| | | if ( assured && ( rsGroupId == groupId ) ) |
| | | /* |
| | | * If assured configured, set message accordingly to request an ack in the |
| | | * right assured mode. |
| | | * No ack requested for a RS with a different group id. Assured |
| | | * replication suported for the same locality, i.e: a topology working in |
| | | * the same |
| | | * geographical location). If we are connected to a RS which is not in our |
| | | * locality, no need to ask for an ack. |
| | | */ |
| | | if (assured && (rsGroupId == groupId)) |
| | | { |
| | | msg.setAssured(true); |
| | | msg.setAssuredMode(assuredMode); |
| | |
| | | waitingAckMsgs.put(msg.getChangeNumber(), msg); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Publish the update |
| | | broker.publish(msg); |
| | | state.update(msg.getChangeNumber()); |
| | | numSentUpdates.incrementAndGet(); |
| | | /** |
| | | * Wait for the processing of an assured message after it has been sent, if |
| | | * assured replication is configured, otherwise, do nothing. |
| | | * The prepareWaitForAckIfAssuredEnabled method should have been called |
| | | * before, see its comment for the full picture. |
| | | * |
| | | * @param msg The UpdateMsg for which we are waiting for an ack. |
| | | * @throws TimeoutException When the configured timeout occurs waiting for the |
| | | * ack. |
| | | */ |
| | | protected void waitForAckIfAssuredEnabled(UpdateMsg msg) |
| | | throws TimeoutException |
| | | { |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | |
| | | // If assured mode configured, wait for acknowledgement for the just sent |
| | | // message |
| | | if ( assured && ( rsGroupId == groupId ) ) |
| | | if (assured && (rsGroupId == groupId)) |
| | | { |
| | | // Increment assured replication monitoring counters |
| | | switch(assuredMode) |
| | | switch (assuredMode) |
| | | { |
| | | case SAFE_READ_MODE: |
| | | assuredSrSentUpdates.incrementAndGet(); |
| | |
| | | assuredSdSentUpdates.incrementAndGet(); |
| | | break; |
| | | default: |
| | | // Should not happen |
| | | // Should not happen |
| | | } |
| | | |
| | | // Now wait for ack matching the sent assured update |
| | | waitForAck(msg); |
| | | } else |
| | | { |
| | | // Not assured or bad group id, return immediately |
| | | return; |
| | | } |
| | | |
| | | // Wait for the ack to be received, timing out if necessary |
| | | long startTime = System.currentTimeMillis(); |
| | | synchronized (msg) |
| | | { |
| | | ChangeNumber cn = msg.getChangeNumber(); |
| | | while (waitingAckMsgs.containsKey(cn)) |
| | | { |
| | | try |
| | | { |
| | | // WARNING: this timeout may be difficult to optimize: too low, it |
| | | // may use too much CPU, too high, it may penalize performance... |
| | | msg.wait(10); |
| | | } catch (InterruptedException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("waitForAck method interrupted for replication " + |
| | | "serviceID: " + serviceID); |
| | | } |
| | | break; |
| | | } |
| | | // Timeout ? |
| | | if ( (System.currentTimeMillis() - startTime) >= assuredTimeout ) |
| | | { |
| | | // Timeout occured, be sure that ack is not being received and if so, |
| | | // remove the update from the wait list, log the timeout error and |
| | | // also update assured monitoring counters |
| | | UpdateMsg update; |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | update = waitingAckMsgs.remove(cn); |
| | | } |
| | | |
| | | if (update != null) |
| | | { |
| | | // No luck, this is a real timeout |
| | | // Increment assured replication monitoring counters |
| | | switch (msg.getAssuredMode()) |
| | | { |
| | | case SAFE_READ_MODE: |
| | | assuredSrNotAcknowledgedUpdates.incrementAndGet(); |
| | | assuredSrTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer( |
| | | assuredSrServerNotAcknowledgedUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | case SAFE_DATA_MODE: |
| | | assuredSdTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | default: |
| | | // Should not happen |
| | | } |
| | | |
| | | throw new TimeoutException("No ack received for message cn: " + cn + |
| | | " and replication servceID: " + serviceID + " after " + |
| | | assuredTimeout + " ms."); |
| | | } else |
| | | { |
| | | // Ack received just before timeout limit: we can exit |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Publish an {@link UpdateMsg} to the Replication Service. |
| | | * <p> |
| | | * The Replication Service will handle the delivery of this {@link UpdateMsg} |
| | | * to all the participants of this Replication Domain. |
| | | * These members will be receive this {@link UpdateMsg} through a call |
| | | * of the {@link #processUpdate(UpdateMsg)} message. |
| | | * |
| | | * @param msg The UpdateMsg that should be pushed. |
| | | */ |
| | | public void publish(UpdateMsg msg) |
| | | { |
| | | // Publish the update |
| | | broker.publish(msg); |
| | | state.update(msg.getChangeNumber()); |
| | | numSentUpdates.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | |
| | | public void publish(byte[] msg) |
| | | { |
| | | UpdateMsg update = new UpdateMsg(generator.newChangeNumber(), msg); |
| | | |
| | | // If assured replication is configured, this will prepare blocking |
| | | // mechanism. If assured replication is disabled, this returns |
| | | // immediately |
| | | prepareWaitForAckIfAssuredEnabled(update); |
| | | |
| | | publish(update); |
| | | |
| | | try |
| | | { |
| | | publish(update); |
| | | } catch (TimeoutException e) |
| | | // If assured replication is enabled, this will wait for the matching |
| | | // ack or time out. If assured replication is disabled, this returns |
| | | // immediately |
| | | waitForAckIfAssuredEnabled(update); |
| | | } catch (TimeoutException ex) |
| | | { |
| | | // Should never happen as assured mode not requested |
| | | // This exception may only be raised if assured replication is |
| | | // enabled |
| | | Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString( |
| | | assuredTimeout), msg.toString()); |
| | | logError(errorMsg); |
| | | } |
| | | } |
| | | |