| | |
| | | // According to assured sub-mode, prepare structures to keep track of |
| | | // the acks we are interested in. |
| | | AssuredMode assuredMode = update.getAssuredMode(); |
| | | if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | if (assuredMode == AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler); |
| | | } else if (assuredMode != AssuredMode.SAFE_READ_MODE) |
| | | } else if (assuredMode == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler); |
| | | } else |
| | |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId) |
| | | expectedServers.add(handler.getServerId()); |
| | | // No ack expected from a RS with different group id |
| | | { |
| | | if ((generationId > 0) && |
| | | (generationId == handler.getGenerationId())) |
| | | // No ack expected from a RS with bad gen id |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | continue; |
| | | } |
| | | if (handler.getGroupId() == groupId) |
| | | // No ack expected from a DS with different group id |
| | | { |
| | | if (handler.getStatus() == ServerStatus.NORMAL_STATUS) |
| | | ServerStatus serverStatus = handler.getStatus(); |
| | | if (serverStatus == ServerStatus.NORMAL_STATUS) |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | } else |
| | | // No ack expected from a DS with wrong status |
| | | { |
| | | wrongStatusServers.add(handler.getServerId()); |
| | | if (serverStatus == ServerStatus.DEGRADED_STATUS) |
| | | { |
| | | wrongStatusServers.add(handler.getServerId()); |
| | | } else |
| | | { |
| | | /* |
| | | * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS: |
| | | * We do not want this to be reported as an error to the update |
| | | * maker -> no pollution or potential missunderstanding when |
| | | * reading logs or monitoring and it was just administration (for |
| | | * instance new server is being configured in topo: it goes in bad |
| | | * gen then then full full update). |
| | | */ |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | UpdateMsg update, ServerHandler sourceHandler) throws IOException |
| | | { |
| | | ChangeNumber cn = update.getChangeNumber(); |
| | | boolean interestedInAcks = true; |
| | | boolean interestedInAcks = false; |
| | | byte safeDataLevel = update.getSafeDataLevel(); |
| | | byte groupId = replicationServer.getGroupId(); |
| | | byte sourceGroupId = sourceHandler.getGroupId(); |
| | |
| | | Short.toString(replicationServer.getServerId()), |
| | | Byte.toString(safeDataLevel), baseDn, update.toString()); |
| | | logError(errorMsg); |
| | | interestedInAcks = false; |
| | | } else if (sourceGroupId != groupId) |
| | | { |
| | | // Assured feature does not cross different group ids |
| | | interestedInAcks = false; |
| | | } else |
| | | { |
| | | if (sourceHandler.isLDAPserver()) |
| | | if ((generationId > 0) && |
| | | (generationId == sourceHandler.getGenerationId())) |
| | | // Ignore assured updates from wrong generationid servers |
| | | { |
| | | if (safeDataLevel == (byte) 1) |
| | | if (sourceHandler.isLDAPserver()) |
| | | { |
| | | // Immediatly return the ack for an assured message in safe data mode |
| | | // with safe data level 1, coming from a DS. No need to wait for more |
| | | // acks |
| | | AckMsg ack = new AckMsg(cn); |
| | | sourceHandler.sendAck(ack); |
| | | interestedInAcks = false; // No further acks to obtain |
| | | if (safeDataLevel == (byte) 1) |
| | | { |
| | | // Immediatly return the ack for an assured message in safe data |
| | | // mode with safe data level 1, coming from a DS. No need to wait |
| | | // for more acks |
| | | AckMsg ack = new AckMsg(cn); |
| | | sourceHandler.sendAck(ack); |
| | | } else |
| | | { |
| | | if (safeDataLevel != (byte) 0) |
| | | { |
| | | // level > 1 : We need further acks |
| | | // The message will be posted in assured mode to elligible |
| | | // servers. The embedded safe data level is not changed, and his |
| | | // value will be used by a remote RS to determine if he must send |
| | | // an ack (level > 1) or not (level = 1) |
| | | interestedInAcks = true; |
| | | } else |
| | | { |
| | | // Should never happen |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | // level > 1 : We need further acks |
| | | // The message will be posted in assured mode to elligible servers. |
| | | // The embedded safe data level is not changed, and his value will be |
| | | // used by a remote RS to determine if he must send an ack (level > 1) |
| | | // or not (level = 1) |
| | | } |
| | | } else |
| | | { // A RS sent us the safe data message, for sure no futher acks to wait |
| | | interestedInAcks = false; |
| | | if (safeDataLevel == (byte) 1) |
| | | { |
| | | // The original level was 1 so the RS that sent us this message should |
| | | // have already sent his ack to the sender DS. Level 1 has already |
| | | // been reached so no further acks to wait |
| | | // This should not happen in theory as the sender RS server should |
| | | // have sent us a matching not assured message so we should not come |
| | | // to here. |
| | | } else |
| | | { |
| | | // level > 1, so Ack this message to originator RS |
| | | AckMsg ack = new AckMsg(cn); |
| | | sourceHandler.sendAck(ack); |
| | | { // A RS sent us the safe data message, for sure no futher acks to wait |
| | | if (safeDataLevel == (byte) 1) |
| | | { |
| | | // The original level was 1 so the RS that sent us this message |
| | | // should have already sent his ack to the sender DS. Level 1 has |
| | | // already been reached so no further acks to wait. |
| | | // This should not happen in theory as the sender RS server should |
| | | // have sent us a matching not assured message so we should not come |
| | | // to here. |
| | | } else |
| | | { |
| | | // level > 1, so Ack this message to originator RS |
| | | AckMsg ack = new AckMsg(cn); |
| | | sourceHandler.sendAck(ack); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId) |
| | | expectedServers.add(handler.getServerId()); |
| | | // No ack expected from a RS with different group id |
| | | { |
| | | if ((generationId > 0) && |
| | | (generationId == handler.getGenerationId())) |
| | | // No ack expected from a RS with bad gen id |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Look for DS elligible for assured |
| | | for (ServerHandler handler : directoryServers.values()) |
| | | { |
| | | // Don't forward the change to the server that just sent it |
| | | if (handler == sourceHandler) |
| | | { |
| | | continue; |
| | | } |
| | | if (handler.getGroupId() == groupId) |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | |
| | | // Return computed structures |
| | | PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo(); |
| | | if (interestedInAcks && (expectedServers.size() > 0)) |
| | | int nExpectedServers = expectedServers.size(); |
| | | if (interestedInAcks) // interestedInAcks so level > 1 |
| | | { |
| | | // Some other acks to wait for |
| | | preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn, |
| | | sourceHandler, update.getSafeDataLevel()); |
| | | preparedAssuredInfo.expectedServers = expectedServers; |
| | | } |
| | | |
| | | if (interestedInAcks && (preparedAssuredInfo.expectedServers == null)) |
| | | { |
| | | // level > 1 and source is a DS but no elligible servers found, send the |
| | | // ack immediatly |
| | | AckMsg ack = new AckMsg(cn); |
| | | sourceHandler.sendAck(ack); |
| | | if (nExpectedServers > 0) |
| | | { |
| | | // Some other acks to wait for |
| | | int sdl = update.getSafeDataLevel(); |
| | | int neededAdditionalServers = sdl - 1; |
| | | // Change the number of expected acks if not enough available elligible |
| | | // servers: the level is a best effort thing, we do not want to timeout |
| | | // at every assured SD update for instance if a RS has had his gen id |
| | | // resetted |
| | | byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ? |
| | | (byte)sdl : // Keep level as it was |
| | | (byte)(nExpectedServers+1)); // Change level to match what's available |
| | | preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn, |
| | | sourceHandler, finalSdl, expectedServers); |
| | | preparedAssuredInfo.expectedServers = expectedServers; |
| | | } else |
| | | { |
| | | // level > 1 and source is a DS but no elligible servers found, send the |
| | | // ack immediatly |
| | | AckMsg ack = new AckMsg(cn); |
| | | sourceHandler.sendAck(ack); |
| | | } |
| | | } |
| | | |
| | | return preparedAssuredInfo; |
| | |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | // Terminate the assured timer |
| | | assuredTimeoutTimer.cancel(); |
| | | |
| | | // Close session with other changelogs |
| | | for (ServerHandler serverHandler : replicationServers.values()) |
| | | { |