| | |
| | | import java.math.BigDecimal; |
| | | import java.math.MathContext; |
| | | import java.math.RoundingMode; |
| | | import java.net.*; |
| | | import java.net.ConnectException; |
| | | import java.net.Socket; |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | |
| | | /** |
| | | * Replication server URLs under this format: "<code>hostname:port</code>". |
| | | */ |
| | | private volatile Collection<String> replicationServerUrls; |
| | | private volatile Set<String> replicationServerUrls; |
| | | private volatile boolean connected = false; |
| | | /** |
| | | * String reported under CSN=monitor when there is no connected RS. |
| | |
| | | /** The server id of the RS we are connected to. */ |
| | | private Integer rsServerId = -1; |
| | | /** The server URL of the RS we are connected to. */ |
| | | private String rsServerUrl = null; |
| | | private String rsServerUrl; |
| | | /** Our replication domain. */ |
| | | private ReplicationDomain domain = null; |
| | | private ReplicationDomain domain; |
| | | /** |
| | | * This object is used as a conditional event to be notified about |
| | | * the reception of monitor information from the Replication Server. |
| | |
| | | /** |
| | | * A thread to monitor heartbeats on the session. |
| | | */ |
| | | private HeartbeatMonitor heartbeatMonitor = null; |
| | | private HeartbeatMonitor heartbeatMonitor; |
| | | /** |
| | | * The number of times the connection was lost. |
| | | */ |
| | |
| | | * The thread that publishes messages to the RS containing the current |
| | | * change time of this DS. |
| | | */ |
| | | private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null; |
| | | private CTHeartbeatPublisherThread ctHeartbeatPublisherThread; |
| | | /** |
| | | * The expected period in milliseconds between these messages are sent |
| | | * to the replication server. Zero means heartbeats are off. |
| | |
| | | /* |
| | | * Properties for the last topology info received from the network. |
| | | */ |
| | | // Info for other DSs. |
| | | // Warning: does not contain info for us (for our server id) |
| | | /** |
| | | * Info for other DSs. |
| | | * <p> |
| | | * Warning: does not contain info for us (for our server id) |
| | | */ |
| | | private volatile List<DSInfo> dsList = new ArrayList<DSInfo>(); |
| | | private volatile long generationID; |
| | | private volatile int updateDoneCount = 0; |
| | |
| | | * replication server one wants to connect. Key: replication server id Value: |
| | | * replication server info for the matching replication server id |
| | | */ |
| | | private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos |
| | | = null; |
| | | private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos; |
| | | |
| | | /** |
| | | * This integer defines when the best replication server checking algorithm |
| | |
| | | * |
| | | * @param replicationServers list of servers used |
| | | */ |
| | | public void start(Collection<String> replicationServers) |
| | | public void start(Set<String> replicationServers) |
| | | { |
| | | synchronized (startStopLock) |
| | | { |
| | |
| | | { |
| | | // RS has no generation id |
| | | return ServerStatus.NORMAL_STATUS; |
| | | } else |
| | | } |
| | | else if (rsGenId != dsGenId) |
| | | { |
| | | if (rsGenId == dsGenId) |
| | | // DS and RS do not have same generation id |
| | | return ServerStatus.BAD_GEN_ID_STATUS; |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | DS and RS have same generation id |
| | | |
| | | Determine if we are late or not to replay changes. RS uses a |
| | | threshold value for pending changes to be replayed by a DS to |
| | | determine if the DS is in normal status or in degraded status. |
| | | Let's compare the local and remote server state using this threshold |
| | | value to determine if we are late or not |
| | | */ |
| | | |
| | | int nChanges = ServerState.diffChanges(rsState, state); |
| | | if (debugEnabled()) |
| | | { |
| | | /* |
| | | DS and RS have same generation id |
| | | |
| | | Determine if we are late or not to replay changes. RS uses a |
| | | threshold value for pending changes to be replayed by a DS to |
| | | determine if the DS is in normal status or in degraded status. |
| | | Let's compare the local and remote server state using this threshold |
| | | value to determine if we are late or not |
| | | */ |
| | | |
| | | ServerStatus initStatus; |
| | | int nChanges = ServerState.diffChanges(rsState, state); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("RB for dn " + baseDN + " and with server id " |
| | | + serverId + " computed " + nChanges + " changes late."); |
| | | } |
| | | |
| | | /* |
| | | Check status to know if it is relevant to change the status. Do not |
| | | take RSD lock to test. If we attempt to change the status whereas |
| | | we are in a status that do not allows that, this will be noticed by |
| | | the changeStatusFromStatusAnalyzer method. This allows to take the |
| | | lock roughly only when needed versus every sleep time timeout. |
| | | */ |
| | | if (degradedStatusThreshold > 0) |
| | | { |
| | | if (nChanges >= degradedStatusThreshold) |
| | | { |
| | | initStatus = ServerStatus.DEGRADED_STATUS; |
| | | } else |
| | | { |
| | | initStatus = ServerStatus.NORMAL_STATUS; |
| | | } |
| | | } else |
| | | { |
| | | /* |
| | | 0 threshold value means no degrading system used (no threshold): |
| | | force normal status |
| | | */ |
| | | initStatus = ServerStatus.NORMAL_STATUS; |
| | | } |
| | | |
| | | return initStatus; |
| | | } else |
| | | { |
| | | // DS and RS do not have same generation id |
| | | return ServerStatus.BAD_GEN_ID_STATUS; |
| | | TRACER.debugInfo("RB for dn " + baseDN + " and with server id " |
| | | + serverId + " computed " + nChanges + " changes late."); |
| | | } |
| | | |
| | | /* |
| | | Check status to know if it is relevant to change the status. Do not |
| | | take RSD lock to test. If we attempt to change the status whereas |
| | | we are in a status that do not allows that, this will be noticed by |
| | | the changeStatusFromStatusAnalyzer method. This allows to take the |
| | | lock roughly only when needed versus every sleep time timeout. |
| | | */ |
| | | if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold) |
| | | { |
| | | return ServerStatus.DEGRADED_STATUS; |
| | | } |
| | | // degradedStatusThreshold value of '0' means no degrading system used |
| | | // (no threshold): force normal status |
| | | return ServerStatus.NORMAL_STATUS; |
| | | } |
| | | } |
| | | |
| | |
| | | return bestServers.values().iterator().next(); |
| | | } |
| | | |
| | | if (firstConnection) |
| | | { |
| | | // We are not connected to a server yet |
| | | return computeBestServerForWeight(bestServers, -1, -1); |
| | | } else |
| | | { |
| | | /* |
| | | We are already connected to a RS: compute the best RS as far as the |
| | | weights is concerned. If this is another one, some DS must |
| | | disconnect. |
| | | */ |
| | | return computeBestServerForWeight(bestServers, rsServerId, |
| | | localServerId); |
| | | } |
| | | if (firstConnection) |
| | | { |
| | | // We are not connected to a server yet |
| | | return computeBestServerForWeight(bestServers, -1, -1); |
| | | } |
| | | /* |
| | | * We are already connected to a RS: compute the best RS as far as the |
| | | * weights is concerned. If this is another one, some DS must disconnect. |
| | | */ |
| | | return computeBestServerForWeight(bestServers, rsServerId, localServerId); |
| | | } |
| | | |
| | | /** |
| | |
| | | sumOfWeights += replicationServerInfo.getWeight(); |
| | | sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber(); |
| | | } |
| | | |
| | | // Distance (difference) of the current loads to the load goals of each RS: |
| | | // key:server id, value: distance |
| | | Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>(); |
| | | // Precision for the operations (number of digits after the dot) |
| | | MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); |
| | | final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | |
| | | |
| | | if (currentRsServerId == -1) |
| | | { |
| | | // The local server is not connected yet |
| | | // The local server is not connected yet, find best server to connect to, |
| | | // taking the weights into account. |
| | | return computeBestServerWhenNotConnected(bestServers, loadDistances); |
| | | } |
| | | // The local server is currently connected to a RS, let's see if it must |
| | | // disconnect or not, taking the weights into account. |
| | | return computeBestServerWhenConnected(bestServers, loadDistances, |
| | | localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs); |
| | | } |
| | | |
| | | private static ReplicationServerInfo computeBestServerWhenNotConnected( |
| | | Map<Integer, ReplicationServerInfo> bestServers, |
| | | Map<Integer, BigDecimal> loadDistances) |
| | | { |
| | | /* |
| | | * Find the server with the current highest distance to its load goal and |
| | | * choose it. Make an exception if every server is correctly balanced, |
| | | * that is every current load distances are equal to 0, in that case, |
| | | * choose the server with the highest weight |
| | | */ |
| | | int bestRsId = 0; // If all server equal, return the first one |
| | | float highestDistance = Float.NEGATIVE_INFINITY; |
| | | boolean allRsWithZeroDistance = true; |
| | | int highestWeightRsId = -1; |
| | | int highestWeight = -1; |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | float loadDistance = loadDistances.get(rsId).floatValue(); |
| | | if (loadDistance > highestDistance) |
| | | { |
| | | // This server is far more from its balance point |
| | | bestRsId = rsId; |
| | | highestDistance = loadDistance; |
| | | } |
| | | if (loadDistance != 0) |
| | | { |
| | | allRsWithZeroDistance = false; |
| | | } |
| | | int weight = bestServers.get(rsId).getWeight(); |
| | | if (weight > highestWeight) |
| | | { |
| | | // This server has a higher weight |
| | | highestWeightRsId = rsId; |
| | | highestWeight = weight; |
| | | } |
| | | } |
| | | // All servers with a 0 distance ? |
| | | if (allRsWithZeroDistance) |
| | | { |
| | | // Choose server with the highest weight |
| | | bestRsId = highestWeightRsId; |
| | | } |
| | | return bestServers.get(bestRsId); |
| | | } |
| | | |
| | | private static ReplicationServerInfo computeBestServerWhenConnected( |
| | | Map<Integer, ReplicationServerInfo> bestServers, |
| | | Map<Integer, BigDecimal> loadDistances, int localServerId, |
| | | int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs) |
| | | { |
| | | final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); |
| | | float currentLoadDistance = |
| | | loadDistances.get(currentRsServerId).floatValue(); |
| | | if (currentLoadDistance < 0) |
| | | { |
| | | /* |
| | | * Find the server with the current highest distance to its load goal and |
| | | * choose it. Make an exception if every server is correctly balanced, |
| | | * that is every current load distances are equal to 0, in that case, |
| | | * choose the server with the highest weight |
| | | */ |
| | | int bestRsId = 0; // If all server equal, return the first one |
| | | float highestDistance = Float.NEGATIVE_INFINITY; |
| | | boolean allRsWithZeroDistance = true; |
| | | int highestWeightRsId = -1; |
| | | int highestWeight = -1; |
| | | Too much DSs connected to the current RS, compared with its load |
| | | goal: |
| | | Determine the potential number of DSs to disconnect from the current |
| | | RS and see if the local DS is part of them: the DSs that must |
| | | disconnect are those with the lowest server id. |
| | | Compute the sum of the distances of the load goals of the other RSs |
| | | */ |
| | | BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO; |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | float loadDistance = loadDistances.get(rsId).floatValue(); |
| | | if (loadDistance > highestDistance) |
| | | if (rsId != currentRsServerId) |
| | | { |
| | | // This server is far more from its balance point |
| | | bestRsId = rsId; |
| | | highestDistance = loadDistance; |
| | | } |
| | | if (loadDistance != 0) |
| | | { |
| | | allRsWithZeroDistance = false; |
| | | } |
| | | int weight = bestServers.get(rsId).getWeight(); |
| | | if (weight > highestWeight) |
| | | { |
| | | // This server has a higher weight |
| | | highestWeightRsId = rsId; |
| | | highestWeight = weight; |
| | | sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add( |
| | | loadDistances.get(rsId), mathContext); |
| | | } |
| | | } |
| | | // All servers with a 0 distance ? |
| | | if (allRsWithZeroDistance) |
| | | { |
| | | // Choose server with the highest weight |
| | | bestRsId = highestWeightRsId; |
| | | } |
| | | return bestServers.get(bestRsId); |
| | | } else |
| | | { |
| | | // The local server is currently connected to a RS, let's see if it must |
| | | // disconnect or not, taking the weights into account. |
| | | |
| | | float currentLoadDistance = |
| | | loadDistances.get(currentRsServerId).floatValue(); |
| | | if (currentLoadDistance < 0) |
| | | if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0) |
| | | { |
| | | /* |
| | | Too much DSs connected to the current RS, compared with its load |
| | | goal: |
| | | Determine the potential number of DSs to disconnect from the current |
| | | RS and see if the local DS is part of them: the DSs that must |
| | | disconnect are those with the lowest server id. |
| | | Compute the sum of the distances of the load goals of the other RSs |
| | | The average distance of the other RSs shows a lack of DSs. |
| | | Compute the number of DSs to disconnect from the current RS, |
| | | rounding to the nearest integer number. Do only this if there is |
| | | no risk of yoyo effect: when the exact balance cannot be |
| | | established due to the current number of DSs connected, do not |
| | | disconnect a DS. A simple example where the balance cannot be |
| | | reached is: |
| | | - RS1 has weight 1 and 2 DSs |
| | | - RS2 has weight 1 and 1 DS |
| | | => disconnecting a DS from RS1 to reconnect it to RS2 would have no |
| | | sense as this would lead to the reverse situation. In that case, |
| | | the perfect balance cannot be reached and we must stick to the |
| | | current situation, otherwise the DS would keep move between the 2 |
| | | RSs |
| | | */ |
| | | BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO; |
| | | for (Integer rsId : bestServers.keySet()) |
| | | float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd. |
| | | multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext) |
| | | .floatValue(); |
| | | int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber); |
| | | |
| | | // Avoid yoyo effect |
| | | if (overloadingDSsNumber == 1) |
| | | { |
| | | if (rsId != currentRsServerId) |
| | | { |
| | | sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add( |
| | | loadDistances.get(rsId), mathContext); |
| | | } |
| | | } |
| | | |
| | | if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0) |
| | | { |
| | | /* |
| | | The average distance of the other RSs shows a lack of DSs. |
| | | Compute the number of DSs to disconnect from the current RS, |
| | | rounding to the nearest integer number. Do only this if there is |
| | | no risk of yoyo effect: when the exact balance cannot be |
| | | established due to the current number of DSs connected, do not |
| | | disconnect a DS. A simple example where the balance cannot be |
| | | reached is: |
| | | - RS1 has weight 1 and 2 DSs |
| | | - RS2 has weight 1 and 1 DS |
| | | => disconnecting a DS from RS1 to reconnect it to RS2 would have no |
| | | sense as this would lead to the reverse situation. In that case, |
| | | the perfect balance cannot be reached and we must stick to the |
| | | current situation, otherwise the DS would keep move between the 2 |
| | | RSs |
| | | */ |
| | | float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd. |
| | | multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext) |
| | | .floatValue(); |
| | | int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber); |
| | | |
| | | // Avoid yoyo effect |
| | | if (overloadingDSsNumber == 1) |
| | | { |
| | | // What would be the new load distance for the current RS if |
| | | // we disconnect some DSs ? |
| | | ReplicationServerInfo currentReplicationServerInfo = |
| | | bestServers.get(currentRsServerId); |
| | | |
| | | int currentRsWeight = currentReplicationServerInfo.getWeight(); |
| | | BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight); |
| | | BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights); |
| | | BigDecimal currentRsLoadGoalBd = |
| | | currentRsWeightBd.divide(sumOfWeightsBd, mathContext); |
| | | BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO; |
| | | if (sumOfConnectedDSs != 0) |
| | | { |
| | | int connectedDSs = currentReplicationServerInfo. |
| | | getConnectedDSNumber(); |
| | | BigDecimal potentialNewConnectedDSsBd = |
| | | BigDecimal.valueOf(connectedDSs - 1); |
| | | BigDecimal sumOfConnectedDSsBd = |
| | | BigDecimal.valueOf(sumOfConnectedDSs); |
| | | potentialCurrentRsNewLoadBd = |
| | | potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd, |
| | | mathContext); |
| | | } |
| | | BigDecimal potentialCurrentRsNewLoadDistanceBd = |
| | | currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd, |
| | | mathContext); |
| | | |
| | | // What would be the new load distance for the other RSs ? |
| | | BigDecimal additionalDsLoadBd = |
| | | BigDecimal.ONE.divide( |
| | | BigDecimal.valueOf(sumOfConnectedDSs), mathContext); |
| | | BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd = |
| | | sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd, |
| | | mathContext); |
| | | |
| | | /* |
| | | Now compare both values: we must no disconnect the DS if this |
| | | is for going in a situation where the load distance of the other |
| | | RSs is the opposite of the future load distance of the local RS |
| | | or we would evaluate that we should disconnect just after being |
| | | arrived on the new RS. But we should disconnect if we reach the |
| | | perfect balance (both values are 0). |
| | | */ |
| | | MathContext roundMc = |
| | | new MathContext(6, RoundingMode.DOWN); |
| | | BigDecimal potentialCurrentRsNewLoadDistanceBdRounded = |
| | | potentialCurrentRsNewLoadDistanceBd.round(roundMc); |
| | | BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded = |
| | | potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc); |
| | | |
| | | if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo( |
| | | BigDecimal.ZERO) != 0) |
| | | && (potentialCurrentRsNewLoadDistanceBdRounded.equals( |
| | | potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate()))) |
| | | { |
| | | // Avoid the yoyo effect, and keep the local DS connected to its |
| | | // current RS |
| | | return bestServers.get(currentRsServerId); |
| | | } |
| | | } |
| | | |
| | | // Prepare a sorted list (from lowest to highest) or DS server ids |
| | | // connected to the current RS |
| | | ReplicationServerInfo currentRsInfo = |
| | | // What would be the new load distance for the current RS if |
| | | // we disconnect some DSs ? |
| | | ReplicationServerInfo currentReplicationServerInfo = |
| | | bestServers.get(currentRsServerId); |
| | | List<Integer> serversConnectedToCurrentRS = |
| | | currentRsInfo.getConnectedDSs(); |
| | | List<Integer> sortedServers = new ArrayList<Integer>( |
| | | serversConnectedToCurrentRS); |
| | | Collections.sort(sortedServers); |
| | | |
| | | // Go through the list of DSs to disconnect and see if the local |
| | | // server is part of them. |
| | | int index = 0; |
| | | while (overloadingDSsNumber > 0) |
| | | int currentRsWeight = currentReplicationServerInfo.getWeight(); |
| | | BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight); |
| | | BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights); |
| | | BigDecimal currentRsLoadGoalBd = |
| | | currentRsWeightBd.divide(sumOfWeightsBd, mathContext); |
| | | BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO; |
| | | if (sumOfConnectedDSs != 0) |
| | | { |
| | | int severToDisconnectId = sortedServers.get(index); |
| | | if (severToDisconnectId == localServerId) |
| | | { |
| | | // The local server is part of the DSs to disconnect |
| | | return null; |
| | | } |
| | | overloadingDSsNumber--; |
| | | index++; |
| | | int connectedDSs = currentReplicationServerInfo. |
| | | getConnectedDSNumber(); |
| | | BigDecimal potentialNewConnectedDSsBd = |
| | | BigDecimal.valueOf(connectedDSs - 1); |
| | | BigDecimal sumOfConnectedDSsBd = |
| | | BigDecimal.valueOf(sumOfConnectedDSs); |
| | | potentialCurrentRsNewLoadBd = |
| | | potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd, |
| | | mathContext); |
| | | } |
| | | BigDecimal potentialCurrentRsNewLoadDistanceBd = |
| | | currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd, |
| | | mathContext); |
| | | |
| | | // The local server is not part of the servers to disconnect from the |
| | | // current RS. |
| | | } else { |
| | | // The average distance of the other RSs does not show a lack of DSs: |
| | | // no need to disconnect any DS from the current RS. |
| | | // What would be the new load distance for the other RSs ? |
| | | BigDecimal additionalDsLoadBd = |
| | | BigDecimal.ONE.divide( |
| | | BigDecimal.valueOf(sumOfConnectedDSs), mathContext); |
| | | BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd = |
| | | sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd, |
| | | mathContext); |
| | | |
| | | /* |
| | | Now compare both values: we must no disconnect the DS if this |
| | | is for going in a situation where the load distance of the other |
| | | RSs is the opposite of the future load distance of the local RS |
| | | or we would evaluate that we should disconnect just after being |
| | | arrived on the new RS. But we should disconnect if we reach the |
| | | perfect balance (both values are 0). |
| | | */ |
| | | MathContext roundMc = new MathContext(6, RoundingMode.DOWN); |
| | | BigDecimal potentialCurrentRsNewLoadDistanceBdRounded = |
| | | potentialCurrentRsNewLoadDistanceBd.round(roundMc); |
| | | BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded = |
| | | potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc); |
| | | |
| | | if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo( |
| | | BigDecimal.ZERO) != 0) |
| | | && (potentialCurrentRsNewLoadDistanceBdRounded.equals( |
| | | potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate()))) |
| | | { |
| | | // Avoid the yoyo effect, and keep the local DS connected to its |
| | | // current RS |
| | | return bestServers.get(currentRsServerId); |
| | | } |
| | | } |
| | | |
| | | // Prepare a sorted list (from lowest to highest) or DS server ids |
| | | // connected to the current RS |
| | | ReplicationServerInfo currentRsInfo = |
| | | bestServers.get(currentRsServerId); |
| | | List<Integer> serversConnectedToCurrentRS = |
| | | new ArrayList<Integer>(currentRsInfo.getConnectedDSs()); |
| | | Collections.sort(serversConnectedToCurrentRS); |
| | | |
| | | // Go through the list of DSs to disconnect and see if the local |
| | | // server is part of them. |
| | | int index = 0; |
| | | while (overloadingDSsNumber > 0) |
| | | { |
| | | int serverIdToDisconnect = serversConnectedToCurrentRS.get(index); |
| | | if (serverIdToDisconnect == localServerId) |
| | | { |
| | | // The local server is part of the DSs to disconnect |
| | | return null; |
| | | } |
| | | overloadingDSsNumber--; |
| | | index++; |
| | | } |
| | | |
| | | // The local server is not part of the servers to disconnect from the |
| | | // current RS. |
| | | } else { |
| | | // The RS load goal is reached or there are not enough DSs connected to |
| | | // it to reach it: do not disconnect from this RS and return rsInfo for |
| | | // this RS |
| | | // The average distance of the other RSs does not show a lack of DSs: |
| | | // no need to disconnect any DS from the current RS. |
| | | } |
| | | return bestServers.get(currentRsServerId); |
| | | } else { |
| | | // The RS load goal is reached or there are not enough DSs connected to |
| | | // it to reach it: do not disconnect from this RS and return rsInfo for |
| | | // this RS |
| | | } |
| | | return bestServers.get(currentRsServerId); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void publish(ReplicationMsg msg) |
| | | { |
| | | _publish(msg, false, true); |
| | | publish(msg, false, true); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean publish(ReplicationMsg msg, boolean retryOnFailure) |
| | | { |
| | | return _publish(msg, false, retryOnFailure); |
| | | return publish(msg, false, retryOnFailure); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void publishRecovery(ReplicationMsg msg) |
| | | { |
| | | _publish(msg, true, true); |
| | | publish(msg, true, true); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param retryOnFailure whether retry should be done on failure |
| | | * @return whether the message was successfully sent. |
| | | */ |
| | | boolean _publish(ReplicationMsg msg, boolean recoveryMsg, |
| | | private boolean publish(ReplicationMsg msg, boolean recoveryMsg, |
| | | boolean retryOnFailure) |
| | | { |
| | | boolean done = false; |
| | |
| | | |
| | | try |
| | | { |
| | | boolean credit; |
| | | Session current_session; |
| | | Semaphore currentWindowSemaphore; |
| | | |
| | | /* |
| | | save the session at the time when we acquire the |
| | | sendwindow credit so that we can make sure later |
| | |
| | | on a session with a credit that was acquired from a previous |
| | | session. |
| | | */ |
| | | Session currentSession; |
| | | Semaphore currentWindowSemaphore; |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | current_session = session; |
| | | currentSession = session; |
| | | currentWindowSemaphore = sendWindow; |
| | | } |
| | | |
| | |
| | | return false; |
| | | } |
| | | |
| | | boolean credit; |
| | | if (msg instanceof UpdateMsg) |
| | | { |
| | | /* |
| | |
| | | { |
| | | credit = true; |
| | | } |
| | | |
| | | if (credit) |
| | | { |
| | | synchronized (connectPhaseLock) |
| | |
| | | reconnection happened and we need to restart from scratch. |
| | | */ |
| | | |
| | | if ((session != null) && |
| | | (session == current_session)) |
| | | if (session != null && session == currentSession) |
| | | { |
| | | session.publish(msg); |
| | | done = true; |
| | |
| | | break; |
| | | } |
| | | |
| | | final int replicationServerID = rsServerId; |
| | | final int previousRsServerID = rsServerId; |
| | | try |
| | | { |
| | | ReplicationMsg msg = savedSession.receive(); |
| | |
| | | { |
| | | // RS performs a proper disconnection |
| | | Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get( |
| | | replicationServerID, savedSession.getReadableRemoteAddress(), |
| | | previousRsServerID, savedSession.getReadableRemoteAddress(), |
| | | serverId, baseDN.toNormalizedString()); |
| | | logError(message); |
| | | |
| | |
| | | { |
| | | // Stable topology (no topo msg since few seconds): proceed with |
| | | // best server checking. |
| | | ReplicationServerInfo bestServerInfo = |
| | | computeBestReplicationServer(false, rsServerId, state, |
| | | replicationServerInfos, serverId, groupId, |
| | | generationID); |
| | | |
| | | if ((rsServerId != -1) && ((bestServerInfo == null) || |
| | | (bestServerInfo.getServerId() != rsServerId))) |
| | | final ReplicationServerInfo bestServerInfo = |
| | | computeBestReplicationServer(false, previousRsServerID, state, |
| | | replicationServerInfos, serverId, groupId, generationID); |
| | | if (previousRsServerID != -1 |
| | | && (bestServerInfo == null |
| | | || bestServerInfo.getServerId() != previousRsServerID)) |
| | | { |
| | | // The best replication server is no more the one we are |
| | | // currently using. Disconnect properly then reconnect. |
| | |
| | | if (bestServerInfo == null) |
| | | { |
| | | message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( |
| | | serverId, replicationServerID, |
| | | serverId, previousRsServerID, |
| | | savedSession.getReadableRemoteAddress(), |
| | | baseDN.toNormalizedString()); |
| | | } |
| | | else |
| | | { |
| | | message = NOTE_NEW_BEST_REPLICATION_SERVER.get( |
| | | serverId, replicationServerID, |
| | | serverId, previousRsServerID, |
| | | savedSession.getReadableRemoteAddress(), |
| | | bestServerInfo.getServerId(), |
| | | baseDN.toNormalizedString()); |
| | |
| | | { |
| | | // We did not initiate the close on our side, log an error message. |
| | | Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get( |
| | | serverId, baseDN.toNormalizedString(), replicationServerID, |
| | | serverId, baseDN.toNormalizedString(), previousRsServerID, |
| | | savedSession.getReadableRemoteAddress()); |
| | | logError(message); |
| | | } |
| | |
| | | * requires to restart the service. |
| | | * @param groupId The new group id to use |
| | | */ |
| | | public boolean changeConfig( |
| | | Collection<String> replicationServers, int window, long heartbeatInterval, |
| | | byte groupId) |
| | | public boolean changeConfig(Set<String> replicationServers, int window, |
| | | long heartbeatInterval, byte groupId) |
| | | { |
| | | // These parameters needs to be renegotiated with the ReplicationServer |
| | | // so if they have changed, that requires restarting the session with |
| | |
| | | // the connection is modified |
| | | boolean needToRestartSession = |
| | | this.replicationServerUrls == null |
| | | || replicationServers.size() != this.replicationServerUrls.size() |
| | | || !replicationServers.containsAll(this.replicationServerUrls) |
| | | || !replicationServers.equals(this.replicationServerUrls) |
| | | || window != this.maxRcvWindow |
| | | || heartbeatInterval != this.heartbeatInterval |
| | | || groupId != this.groupId; |
| | |
| | | */ |
| | | public List<RSInfo> getRsList() |
| | | { |
| | | List<RSInfo> result = new ArrayList<RSInfo>(); |
| | | |
| | | for (ReplicationServerInfo replicationServerInfo : |
| | | replicationServerInfos.values()) |
| | | final List<RSInfo> result = new ArrayList<RSInfo>(); |
| | | for (ReplicationServerInfo rsInfo : replicationServerInfos.values()) |
| | | { |
| | | result.add(replicationServerInfo.toRSInfo()); |
| | | result.add(rsInfo.toRSInfo()); |
| | | } |
| | | return result; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the replication monitor associated with this broker. |
| | | * Returns the replication monitor instance name associated with this broker. |
| | | * |
| | | * @return The replication monitor. |
| | | * @return The replication monitor instance name. |
| | | */ |
| | | ReplicationMonitor getReplicationMonitor() |
| | | String getReplicationMonitorInstanceName() |
| | | { |
| | | // Only invoked by replication domain so always non-null. |
| | | return monitor; |
| | | return monitor.getMonitorInstanceName(); |
| | | } |
| | | |
| | | private void setSession(final Session newSession) |