| | |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | |
| | | * Add an update that has been received to the list of |
| | | * updates that must be forwarded to all other servers. |
| | | * |
| | | * @param update The update that has been received. |
| | | * @param updateMsg The update that has been received. |
| | | * @param sourceHandler The ServerHandler for the server from which the |
| | | * update was received |
| | | * @throws IOException When an IO exception happens during the update |
| | | * processing. |
| | | */ |
| | | public void put(UpdateMsg update, ServerHandler sourceHandler) |
| | | throws IOException |
| | | public void put(UpdateMsg updateMsg, ServerHandler sourceHandler) throws IOException |
| | | { |
| | | sourceHandler.updateServerState(update); |
| | | sourceHandler.updateServerState(updateMsg); |
| | | sourceHandler.incrementInCount(); |
| | | setGenerationIdIfUnset(sourceHandler.getGenerationId()); |
| | | |
| | |
| | | * data centers, but one will request and wait acks only from servers of the |
| | | * data center 1. |
| | | */ |
| | | boolean assuredMessage = update.isAssured(); |
| | | PreparedAssuredInfo preparedAssuredInfo = null; |
| | | if (assuredMessage) |
| | | { |
| | | // Assured feature is supported starting from replication protocol V2 |
| | | if (sourceHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V2) |
| | | { |
| | | // According to assured sub-mode, prepare structures to keep track of |
| | | // the acks we are interested in. |
| | | AssuredMode assuredMode = update.getAssuredMode(); |
| | | if (assuredMode == AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | sourceHandler.incrementAssuredSdReceivedUpdates(); |
| | | preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler); |
| | | } else if (assuredMode == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | sourceHandler.incrementAssuredSrReceivedUpdates(); |
| | | preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler); |
| | | } else |
| | | { |
| | | // Unknown assured mode: should never happen |
| | | Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | assuredMode.toString(), baseDN.toNormalizedString(), |
| | | update.toString()); |
| | | logError(errorMsg); |
| | | assuredMessage = false; |
| | | } |
| | | } else |
| | | { |
| | | assuredMessage = false; |
| | | } |
| | | } |
| | | final PreparedAssuredInfo preparedAssuredInfo = getPreparedAssuredInfo(updateMsg, sourceHandler); |
| | | |
| | | if (!publishUpdateMsg(update)) |
| | | if (!publishUpdateMsg(updateMsg)) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | List<Integer> expectedServers = null; |
| | | if (assuredMessage) |
| | | { |
| | | expectedServers = preparedAssuredInfo.expectedServers; |
| | | if (expectedServers != null) |
| | | { |
| | | // Store the expected acks info into the global map. |
| | | // The code for processing reception of acks for this update will update |
| | | // info kept in this object and if enough acks received, it will send |
| | | // back the final ack to the requester and remove the object from this |
| | | // map |
| | | // OR |
| | | // The following timer will time out and send an timeout ack to the |
| | | // requester if the acks are not received in time. The timer will also |
| | | // remove the object from this map. |
| | | CSN csn = update.getCSN(); |
| | | waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo); |
| | | |
| | | // Arm timer for this assured update message (wait for acks until it |
| | | // times out) |
| | | AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn); |
| | | assuredTimeoutTimer.schedule(assuredTimeoutTask, |
| | | localReplicationServer.getAssuredTimeout()); |
| | | // Purge timer every 100 treated messages |
| | | assuredTimeoutTimerPurgeCounter++; |
| | | if ((assuredTimeoutTimerPurgeCounter % 100) == 0) |
| | | { |
| | | assuredTimeoutTimer.purge(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (expectedServers == null) |
| | | { |
| | | expectedServers = Collections.emptyList(); |
| | | } |
| | | final List<Integer> assuredServers = getAssuredServers(updateMsg, preparedAssuredInfo); |
| | | |
| | | /** |
| | | * The update message equivalent to the originally received update message, |
| | |
| | | * references posted on every writer queues. That is why we need a message |
| | | * version with assured flag on and another one with assured flag off. |
| | | */ |
| | | NotAssuredUpdateMsg notAssuredUpdate = null; |
| | | final NotAssuredUpdateMsg notAssuredUpdateMsg = |
| | | preparedAssuredInfo != null ? new NotAssuredUpdateMsg(updateMsg) : null; |
| | | |
| | | // Push the message to the replication servers |
| | | if (sourceHandler.isDataServer()) |
| | |
| | | * Ignore updates to RS with bad gen id |
| | | * (no system managed status for a RS) |
| | | */ |
| | | if (isDifferentGenerationId(rsHandler.getGenerationId())) |
| | | if (!isDifferentGenerationId(rsHandler, updateMsg)) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug("update " + update.getCSN() |
| | | + " will not be sent to replication server " |
| | | + rsHandler.getServerId() + " with generation id " |
| | | + rsHandler.getGenerationId() + " different from local " |
| | | + "generation id " + generationId); |
| | | addUpdate(rsHandler, updateMsg, notAssuredUpdateMsg, assuredServers); |
| | | } |
| | | |
| | | continue; |
| | | } |
| | | |
| | | notAssuredUpdate = addUpdate(rsHandler, update, notAssuredUpdate, |
| | | assuredMessage, expectedServers); |
| | | } |
| | | } |
| | | |
| | | // Push the message to the LDAP servers |
| | | for (DataServerHandler dsHandler : connectedDSs.values()) |
| | | { |
| | | // Don't forward the change to the server that just sent it |
| | | if (dsHandler == sourceHandler) |
| | | // Do not forward the change to the server that just sent it |
| | | if (dsHandler != sourceHandler |
| | | && !isUpdateMsgFiltered(updateMsg, dsHandler)) |
| | | { |
| | | continue; |
| | | addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers); |
| | | } |
| | | } |
| | | |
| | | // Push the message to the other subscribing handlers |
| | | for (MessageHandler mHandler : otherHandlers) { |
| | | mHandler.add(updateMsg); |
| | | } |
| | | } |
| | | |
| | | private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler, |
| | | UpdateMsg updateMsg) |
| | | { |
| | | final boolean isDifferent = isDifferentGenerationId(rsHandler.getGenerationId()); |
| | | if (isDifferent && debugEnabled()) |
| | | { |
| | | debug("updateMsg " + updateMsg.getCSN() |
| | | + " will not be sent to replication server " |
| | | + rsHandler.getServerId() + " with generation id " |
| | | + rsHandler.getGenerationId() + " different from local " |
| | | + "generation id " + generationId); |
| | | } |
| | | return isDifferent; |
| | | } |
| | | |
| | | /** |
| | | * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS |
| | | * |
| | | * The RSD lock should not be taken here as it is acceptable to have a |
| | | * delay between the time the server has a wrong status and the fact we |
| | | * detect it: the updates that succeed to pass during this time will have |
| | | * no impact on remote server. But it is interesting to not saturate |
| | | * uselessly the network if the updates are not necessary so this check to |
| | | * stop sending updates is interesting anyway. Not taking the RSD lock |
| | | * allows to have better performances in normal mode (most of the time). |
| | | * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS. |
| | | * <p> |
| | | * The RSD lock should not be taken here as it is acceptable to have a delay |
| | | * between the time the server has a wrong status and the fact we detect it: |
| | | * the updates that succeed to pass during this time will have no impact on |
| | | * remote server. But it is interesting to not saturate uselessly the network |
| | | * if the updates are not necessary so this check to stop sending updates is |
| | | * interesting anyway. Not taking the RSD lock allows to have better |
| | | * performances in normal mode (most of the time). |
| | | */ |
| | | ServerStatus dsStatus = dsHandler.getStatus(); |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS |
| | | || dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | private boolean isUpdateMsgFiltered(UpdateMsg updateMsg, DataServerHandler dsHandler) |
| | | { |
| | | final ServerStatus dsStatus = dsHandler.getStatus(); |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | { |
| | | debug("update " + update.getCSN() |
| | | debug("updateMsg " + updateMsg.getCSN() |
| | | + " will not be sent to directory server " |
| | | + dsHandler.getServerId() + " with generation id " |
| | | + dsHandler.getGenerationId() + " different from local " |
| | | + "generation id " + generationId); |
| | | } |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | return true; |
| | | } |
| | | else if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | | debug("update " + update.getCSN() |
| | | if (debugEnabled()) |
| | | { |
| | | debug("updateMsg " + updateMsg.getCSN() |
| | | + " will not be sent to directory server " |
| | | + dsHandler.getServerId() + " as it is in full update"); |
| | | } |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | continue; |
| | | private PreparedAssuredInfo getPreparedAssuredInfo(UpdateMsg updateMsg, |
| | | ServerHandler sourceHandler) throws IOException |
| | | { |
| | | // Assured feature is supported starting from replication protocol V2 |
| | | if (!updateMsg.isAssured() |
| | | || sourceHandler.getProtocolVersion() < REPLICATION_PROTOCOL_V2) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | notAssuredUpdate = addUpdate(dsHandler, update, notAssuredUpdate, |
| | | assuredMessage, expectedServers); |
| | | // According to assured sub-mode, prepare structures to keep track of |
| | | // the acks we are interested in. |
| | | switch (updateMsg.getAssuredMode()) |
| | | { |
| | | case SAFE_DATA_MODE: |
| | | sourceHandler.incrementAssuredSdReceivedUpdates(); |
| | | return processSafeDataUpdateMsg(updateMsg, sourceHandler); |
| | | |
| | | case SAFE_READ_MODE: |
| | | sourceHandler.incrementAssuredSrReceivedUpdates(); |
| | | return processSafeReadUpdateMsg(updateMsg, sourceHandler); |
| | | |
| | | default: |
| | | // Unknown assured mode: should never happen |
| | | logError(ERR_RS_UNKNOWN_ASSURED_MODE.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | updateMsg.getAssuredMode().toString(), baseDN.toNormalizedString(), |
| | | updateMsg.toString())); |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | // Push the message to the other subscribing handlers |
| | | for (MessageHandler mHandler : otherHandlers) { |
| | | mHandler.add(update); |
| | | private List<Integer> getAssuredServers(UpdateMsg updateMsg, PreparedAssuredInfo preparedAssuredInfo) |
| | | { |
| | | List<Integer> expectedServers = null; |
| | | if (preparedAssuredInfo != null && preparedAssuredInfo.expectedServers != null) |
| | | { |
| | | expectedServers = preparedAssuredInfo.expectedServers; |
| | | // Store the expected acks info into the global map. |
| | | // The code for processing reception of acks for this update will update |
| | | // info kept in this object and if enough acks received, it will send |
| | | // back the final ack to the requester and remove the object from this map |
| | | // OR |
| | | // The following timer will time out and send an timeout ack to the |
| | | // requester if the acks are not received in time. The timer will also |
| | | // remove the object from this map. |
| | | final CSN csn = updateMsg.getCSN(); |
| | | waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo); |
| | | |
| | | // Arm timer for this assured update message (wait for acks until it times out) |
| | | final AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn); |
| | | assuredTimeoutTimer.schedule(assuredTimeoutTask, localReplicationServer.getAssuredTimeout()); |
| | | // Purge timer every 100 treated messages |
| | | assuredTimeoutTimerPurgeCounter++; |
| | | if ((assuredTimeoutTimerPurgeCounter % 100) == 0) |
| | | { |
| | | assuredTimeoutTimer.purge(); |
| | | } |
| | | } |
| | | |
| | | return expectedServers != null ? expectedServers : Collections.<Integer> emptyList(); |
| | | } |
| | | |
| | | private boolean publishUpdateMsg(UpdateMsg updateMsg) |
| | | { |
| | | try |
| | |
| | | } |
| | | } |
| | | |
| | | private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler, |
| | | UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate, |
| | | boolean assuredMessage, List<Integer> expectedServers) |
| | | throws UnsupportedEncodingException |
| | | private void addUpdate(ServerHandler sHandler, UpdateMsg updateMsg, |
| | | NotAssuredUpdateMsg notAssuredUpdateMsg, List<Integer> assuredServers) |
| | | { |
| | | if (assuredMessage) |
| | | // Assured mode: post an assured or not assured matching update message |
| | | // according to what has been computed for the destination server |
| | | if (notAssuredUpdateMsg != null |
| | | && !assuredServers.contains(sHandler.getServerId())) |
| | | { |
| | | // Assured mode: post an assured or not assured matching update |
| | | // message according to what has been computed for the destination server |
| | | if (expectedServers.contains(sHandler.getServerId())) |
| | | { |
| | | sHandler.add(update); |
| | | sHandler.add(notAssuredUpdateMsg); |
| | | } |
| | | else |
| | | { |
| | | if (notAssuredUpdate == null) |
| | | { |
| | | notAssuredUpdate = new NotAssuredUpdateMsg(update); |
| | | sHandler.add(updateMsg); |
| | | } |
| | | sHandler.add(notAssuredUpdate); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | sHandler.add(update); |
| | | } |
| | | return notAssuredUpdate; |
| | | } |
| | | |
| | | /** |