| | |
| | | * received, it is incremented. When it reaches 2, we run the checking |
| | | * algorithm to see if we must reconnect to another best replication server. |
| | | * Then we reset the value to 0. But when a topology message is received, the |
| | | * integer is reseted to 0. This ensures that we wait at least one monitoring |
| | | * integer is reset to 0. This ensures that we wait at least one monitoring |
| | | * publisher period before running the algorithm, but also that we wait at |
| | | * least for a monitoring period after the last received topology message |
| | | * (topology stabilization). |
| | |
| | | /** |
| | | * Sets the locally configured flag for the passed ReplicationServerInfo |
| | | * object, analyzing the local configuration. |
| | | * @param |
| | | * @param replicationServerInfo the Replication server to check and update |
| | | */ |
| | | private void updateRSInfoLocallyConfiguredStatus( |
| | | ReplicationServerInfo replicationServerInfo) |
| | |
| | | */ |
| | | private void connectAsDataServer() |
| | | { |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | /* |
| | | May have created a broker with null replication domain for |
| | | unit test purpose. |
| | | */ |
| | | if (domain != null) |
| | | { |
| | | // If a first connect or a connection failure occur, we go through here. |
| | | // force status machine to NOT_CONNECTED_STATUS so that monitoring can |
| | | // see that we are not connected. |
| | | /* |
| | | If a first connect or a connection failure occur, we go through here. |
| | | force status machine to NOT_CONNECTED_STATUS so that monitoring can |
| | | see that we are not connected. |
| | | */ |
| | | domain.toNotConnectedStatus(); |
| | | } |
| | | |
| | | // Stop any existing poller and heartbeat monitor from a previous session. |
| | | /* |
| | | Stop any existing heartbeat monitor and changeTime publisher |
| | | from a previous session. |
| | | */ |
| | | stopRSHeartBeatMonitoring(); |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | mustRunBestServerCheckingAlgorithm = 0; |
| | |
| | | { |
| | | // At least one server answered, find the best one. |
| | | electedRsInfo = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, baseDn, groupId, getGenerationID()); |
| | | replicationServerInfos, serverId, groupId, getGenerationID()); |
| | | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | |
| | | |
| | | if (electedRsInfo != null) |
| | | { |
| | | // Update replication server info with potentially more up to date |
| | | // data (server state for instance may have changed) |
| | | /* |
| | | Update replication server info with potentially more up to date |
| | | data (server state for instance may have changed) |
| | | */ |
| | | replicationServerInfos |
| | | .put(electedRsInfo.getServerId(), electedRsInfo); |
| | | |
| | |
| | | |
| | | } // Reached some servers |
| | | |
| | | if (connected) |
| | | // connected is set by connectToReplicationServer() |
| | | // and electedRsInfo isn't null then. Check anyway |
| | | if (electedRsInfo != null && connected) |
| | | { |
| | | connectPhaseLock.notify(); |
| | | |
| | |
| | | |
| | | receiveTopo(topologyMsg); |
| | | |
| | | // Log a message to let the administrator know that the failure |
| | | // was resolved. |
| | | // Wakeup all the thread that were waiting on the window |
| | | // on the previous connection. |
| | | /* |
| | | Log a message to let the administrator know that the failure |
| | | was resolved. |
| | | Wake up all the thread that were waiting on the window |
| | | on the previous connection. |
| | | */ |
| | | connectionError = false; |
| | | if (sendWindow != null) |
| | | { |
| | |
| | | rcvWindow = maxRcvWindow; |
| | | connected = true; |
| | | |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | /* |
| | | May have created a broker with null replication domain for |
| | | unit test purpose. |
| | | */ |
| | | if (domain != null) |
| | | { |
| | | domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo |
| | |
| | | |
| | | if (getRsGroupId() != groupId) |
| | | { |
| | | // Connected to replication server with wrong group id: |
| | | // warn user and start poller to recover when a server with |
| | | // right group id arrives... |
| | | /* |
| | | Connected to replication server with wrong group id: |
| | | warn user and start heartbeat monitor to recover when a server |
| | | with the right group id shows up. |
| | | */ |
| | | Message message = |
| | | WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte |
| | | .toString(groupId), Integer.toString(rsServerId), rsInfo |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (connected == false) |
| | | if (!connected) |
| | | { |
| | | ProtocolSession localSession = session; |
| | | if (localSession != null) |
| | |
| | | { |
| | | if (rsGenId == dsGenId) |
| | | { |
| | | // DS and RS have same generation id |
| | | /* |
| | | DS and RS have same generation id |
| | | |
| | | // Determine if we are late or not to replay changes. RS uses a |
| | | // threshold value for pending changes to be replayed by a DS to |
| | | // determine if the DS is in normal status or in degraded status. |
| | | // Let's compare the local and remote server state using this threshold |
| | | // value to determine if we are late or not |
| | | Determine if we are late or not to replay changes. RS uses a |
| | | threshold value for pending changes to be replayed by a DS to |
| | | determine if the DS is in normal status or in degraded status. |
| | | Let's compare the local and remote server state using this threshold |
| | | value to determine if we are late or not |
| | | */ |
| | | |
| | | ServerStatus initStatus = ServerStatus.INVALID_STATUS; |
| | | ServerStatus initStatus; |
| | | int nChanges = ServerState.diffChanges(rsState, state); |
| | | |
| | | if (debugEnabled()) |
| | |
| | | Integer.toString(nChanges) + " changes late."); |
| | | } |
| | | |
| | | // Check status to know if it is relevant to change the status. Do not |
| | | // take RSD lock to test. If we attempt to change the status whereas |
| | | // we are in a status that do not allows that, this will be noticed by |
| | | // the changeStatusFromStatusAnalyzer method. This allows to take the |
| | | // lock roughly only when needed versus every sleep time timeout. |
| | | /* |
| | | Check status to know if it is relevant to change the status. Do not |
| | | take RSD lock to test. If we attempt to change the status whereas |
| | | we are in a status that do not allows that, this will be noticed by |
| | | the changeStatusFromStatusAnalyzer method. This allows to take the |
| | | lock roughly only when needed versus every sleep time timeout. |
| | | */ |
| | | if (degradedStatusThreshold > 0) |
| | | { |
| | | if (nChanges >= degradedStatusThreshold) |
| | |
| | | } |
| | | } else |
| | | { |
| | | // 0 threshold value means no degrading system used (no threshold): |
| | | // force normal status |
| | | /* |
| | | 0 threshold value means no degrading system used (no threshold): |
| | | force normal status |
| | | */ |
| | | initStatus = ServerStatus.NORMAL_STATUS; |
| | | } |
| | | |
| | |
| | | * reply message from the replication server. |
| | | * |
| | | * @param server Server we are connecting with. |
| | | * @param initStatus The status we are starting with |
| | | * @return The ReplServerStartMsg the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | |
| | | * |
| | | * Note: this method is static for test purpose (access from unit tests) |
| | | * |
| | | * |
| | | * @param firstConnection True if we run this method for the very first |
| | | * connection of the broker. False if we run this method to determine if the |
| | | * replication server we are currently connected to is still the best or not. |
| | |
| | | * @param rsInfos The list of available replication servers and their |
| | | * associated information (choice will be made among them). |
| | | * @param localServerId The server id for the suffix we are working for. |
| | | * @param baseDn The suffix for which we are working for. |
| | | * @param groupId The groupId we prefer being connected to if possible |
| | | * @param generationId The generation id we are using |
| | | * @return The computed best replication server. If the returned value is |
| | |
| | | * one). Null can only be returned when firstConnection is false. |
| | | */ |
| | | public static ReplicationServerInfo computeBestReplicationServer( |
| | | boolean firstConnection, int rsServerId, ServerState myState, |
| | | Map<Integer, ReplicationServerInfo> rsInfos, int localServerId, |
| | | String baseDn, byte groupId, long generationId) |
| | | boolean firstConnection, int rsServerId, ServerState myState, |
| | | Map<Integer, ReplicationServerInfo> rsInfos, int localServerId, |
| | | byte groupId, long generationId) |
| | | { |
| | | |
| | | // Shortcut, if only one server, this is the best |
| | |
| | | * - replication server in the same VM as local DS one |
| | | */ |
| | | Map<Integer, ReplicationServerInfo> bestServers = rsInfos; |
| | | // The list of best replication servers is filtered with each criteria. At |
| | | // each criteria, the list is replaced with the filtered one if there |
| | | // are some servers from the filtering, otherwise, the list is left as is |
| | | // and the new filtering for the next criteria is applied and so on. |
| | | /* |
| | | The list of best replication servers is filtered with each criteria. At |
| | | each criteria, the list is replaced with the filtered one if there |
| | | are some servers from the filtering, otherwise, the list is left as is |
| | | and the new filtering for the next criteria is applied and so on. |
| | | |
| | | |
| | | // Use only servers locally configured: those are servers declared in |
| | | // the local configuration. When the current method is called, for |
| | | // sure, at least one server from the list is locally configured |
| | | Use only servers locally configured: those are servers declared in |
| | | the local configuration. When the current method is called, for |
| | | sure, at least one server from the list is locally configured |
| | | */ |
| | | bestServers = |
| | | keepBest(filterServersLocallyConfigured(bestServers), bestServers); |
| | | // Some servers with same group id ? |
| | |
| | | return computeBestServerForWeight(bestServers, -1, -1); |
| | | } else |
| | | { |
| | | // We are already connected to a RS: compute the best RS as far as the |
| | | // weights is concerned. If this is another one, some DS must |
| | | // disconnect. |
| | | /* |
| | | We are already connected to a RS: compute the best RS as far as the |
| | | weights is concerned. If this is another one, some DS must |
| | | disconnect. |
| | | */ |
| | | return computeBestServerForWeight(bestServers, rsServerId, |
| | | localServerId); |
| | | } |
| | |
| | | int rsWeight = replicationServerInfo.getWeight(); |
| | | // load goal = rs weight / sum of weights |
| | | BigDecimal loadGoalBd = BigDecimal.valueOf(rsWeight).divide( |
| | | BigDecimal.valueOf(sumOfWeights), mathContext); |
| | | BigDecimal.valueOf(sumOfWeights), mathContext); |
| | | BigDecimal currentLoadBd = BigDecimal.ZERO; |
| | | if (sumOfConnectedDSs != 0) |
| | | { |
| | | // current load = number of connected DSs / total number of DSs |
| | | int connectedDSs = replicationServerInfo.getConnectedDSNumber(); |
| | | currentLoadBd = BigDecimal.valueOf(connectedDSs).divide( |
| | | BigDecimal.valueOf(sumOfConnectedDSs), mathContext); |
| | | BigDecimal.valueOf(sumOfConnectedDSs), mathContext); |
| | | } |
| | | // load distance = load goal - current load |
| | | BigDecimal loadDistanceBd = |
| | |
| | | loadDistances.get(currentRsServerId).floatValue(); |
| | | if (currentLoadDistance < 0) |
| | | { |
| | | // Too much DSs connected to the current RS, compared with its load |
| | | // goal: |
| | | // Determine the potential number of DSs to disconnect from the current |
| | | // RS and see if the local DS is part of them: the DSs that must |
| | | // disconnect are those with the lowest server id. |
| | | // Compute the sum of the distances of the load goals of the other RSs |
| | | /* |
| | | Too much DSs connected to the current RS, compared with its load |
| | | goal: |
| | | Determine the potential number of DSs to disconnect from the current |
| | | RS and see if the local DS is part of them: the DSs that must |
| | | disconnect are those with the lowest server id. |
| | | Compute the sum of the distances of the load goals of the other RSs |
| | | */ |
| | | BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO; |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | |
| | | |
| | | if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0) |
| | | { |
| | | // The average distance of the other RSs shows a lack of DSs. |
| | | // Compute the number of DSs to disconnect from the current RS, |
| | | // rounding to the nearest integer number. Do only this if there is |
| | | // no risk of yoyo effect: when the exact balance cannot be |
| | | // established due to the current number of DSs connected, do not |
| | | // disconnect a DS. A simple example where the balance cannot be |
| | | // reached is: |
| | | // - RS1 has weight 1 and 2 DSs |
| | | // - RS2 has weight 1 and 1 DS |
| | | // => disconnecting a DS from RS1 to reconnect it to RS2 would have no |
| | | // sense as this would lead to the reverse situation. In that case, |
| | | // the perfect balance cannot be reached and we must stick to the |
| | | // current situation, otherwise the DS would keep move between the 2 |
| | | // RSs |
| | | /* |
| | | The average distance of the other RSs shows a lack of DSs. |
| | | Compute the number of DSs to disconnect from the current RS, |
| | | rounding to the nearest integer number. Do only this if there is |
| | | no risk of yoyo effect: when the exact balance cannot be |
| | | established due to the current number of DSs connected, do not |
| | | disconnect a DS. A simple example where the balance cannot be |
| | | reached is: |
| | | - RS1 has weight 1 and 2 DSs |
| | | - RS2 has weight 1 and 1 DS |
| | | => disconnecting a DS from RS1 to reconnect it to RS2 would have no |
| | | sense as this would lead to the reverse situation. In that case, |
| | | the perfect balance cannot be reached and we must stick to the |
| | | current situation, otherwise the DS would keep move between the 2 |
| | | RSs |
| | | */ |
| | | float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd. |
| | | multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext) |
| | | .floatValue(); |
| | |
| | | // What would be the new load distance for the other RSs ? |
| | | BigDecimal additionalDsLoadBd = |
| | | BigDecimal.ONE.divide( |
| | | BigDecimal.valueOf(sumOfConnectedDSs),mathContext); |
| | | BigDecimal.valueOf(sumOfConnectedDSs), mathContext); |
| | | BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd = |
| | | sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd, |
| | | mathContext); |
| | | |
| | | // Now compare both values: we must no disconnect the DS if this |
| | | // is for going in a situation where the load distance of the other |
| | | // RSs is the opposite of the future load distance of the local RS |
| | | // or we would evaluate that we should disconnect just after being |
| | | // arrived on the new RS. But we should disconnect if we reach the |
| | | // perfect balance (both values are 0). |
| | | /* |
| | | Now compare both values: we must no disconnect the DS if this |
| | | is for going in a situation where the load distance of the other |
| | | RSs is the opposite of the future load distance of the local RS |
| | | or we would evaluate that we should disconnect just after being |
| | | arrived on the new RS. But we should disconnect if we reach the |
| | | perfect balance (both values are 0). |
| | | */ |
| | | MathContext roundMc = |
| | | new MathContext(6, RoundingMode.DOWN); |
| | | BigDecimal potentialCurrentRsNewLoadDistanceBdRounded = |
| | |
| | | while (true) |
| | | { |
| | | // Synchronize inside the loop in order to allow shutdown. |
| | | boolean needSleep = false; |
| | | |
| | | synchronized (startStopLock) |
| | | { |
| | | if (connected || shutdown) |
| | |
| | | break; |
| | | } |
| | | |
| | | needSleep = true; |
| | | } |
| | | |
| | | if (needSleep) |
| | | try |
| | | { |
| | | try |
| | | { |
| | | Thread.sleep(500); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // ignore |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // ignore |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | if (connectionError) |
| | | { |
| | | // It was not possible to connect to any replication server. |
| | | // Since the operation was already processed, we have no other |
| | | // choice than to return without sending the ReplicationMsg |
| | | // and relying on the resend procedure of the connect phase to |
| | | // fix the problem when we finally connect. |
| | | /* |
| | | It was not possible to connect to any replication server. |
| | | Since the operation was already processed, we have no other |
| | | choice than to return without sending the ReplicationMsg |
| | | and relying on the resend procedure of the connect phase to |
| | | fix the problem when we finally connect. |
| | | */ |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | ProtocolSession current_session; |
| | | Semaphore currentWindowSemaphore; |
| | | |
| | | // save the session at the time when we acquire the |
| | | // sendwindow credit so that we can make sure later |
| | | // that the session did not change in between. |
| | | // This is necessary to make sure that we don't publish a message |
| | | // on a session with a credit that was acquired from a previous |
| | | // session. |
| | | /* |
| | | save the session at the time when we acquire the |
| | | sendwindow credit so that we can make sure later |
| | | that the session did not change in between. |
| | | This is necessary to make sure that we don't publish a message |
| | | on a session with a credit that was acquired from a previous |
| | | session. |
| | | */ |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | current_session = session; |
| | | currentWindowSemaphore = sendWindow; |
| | | } |
| | | |
| | | // If the Replication domain has decided that there is a need to |
| | | // recover some changes then it is not allowed to send this |
| | | // change but it will be the responsibility of the recovery thread to |
| | | // do it. |
| | | /* |
| | | If the Replication domain has decided that there is a need to |
| | | recover some changes then it is not allowed to send this |
| | | change but it will be the responsibility of the recovery thread to |
| | | do it. |
| | | */ |
| | | if (!recoveryMsg & connectRequiresRecovery) |
| | | { |
| | | return false; |
| | |
| | | |
| | | if (msg instanceof UpdateMsg) |
| | | { |
| | | // Acquiring the window credit must be done outside of the |
| | | // connectPhaseLock because it can be blocking and we don't |
| | | // want to hold off reconnection in case the connection dropped. |
| | | /* |
| | | Acquiring the window credit must be done outside of the |
| | | connectPhaseLock because it can be blocking and we don't |
| | | want to hold off reconnection in case the connection dropped. |
| | | */ |
| | | credit = |
| | | currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS); |
| | | } else |
| | |
| | | { |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | // session may have been set to null in the connection phase |
| | | // when restarting the broker for example. |
| | | /* |
| | | session may have been set to null in the connection phase |
| | | when restarting the broker for example. |
| | | Check the session. If it has changed, some disconnection or |
| | | reconnection happened and we need to restart from scratch. |
| | | */ |
| | | |
| | | // check the session. If it has changed, some |
| | | // deconnection/reconnection happened and we need to restart from |
| | | // scratch. |
| | | if ((session != null) && |
| | | (session == current_session)) |
| | | { |
| | |
| | | { |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | // the window is still closed. |
| | | // Send a WindowProbeMsg message to wakeup the receiver in case the |
| | | // window update message was lost somehow... |
| | | // then loop to check again if connection was closed. |
| | | /* |
| | | the window is still closed. |
| | | Send a WindowProbeMsg message to wake up the receiver in case the |
| | | window update message was lost somehow... |
| | | then loop to check again if connection was closed. |
| | | */ |
| | | if (session != null) { |
| | | session.publish(new WindowProbeMsg()); |
| | | } |
| | |
| | | |
| | | /** |
| | | * Receive a message. |
| | | * This method is not multithread safe and should either always be |
| | | * This method is not thread-safe and should either always be |
| | | * called in a single thread or protected by a locking mechanism |
| | | * before being called. This is a wrapper to the method with a boolean version |
| | | * so that we do not have to modify existing tests. |
| | |
| | | |
| | | /** |
| | | * Receive a message. |
| | | * This method is not multithread safe and should either always be |
| | | * This method is not thread-safe and should either always be |
| | | * called in a single thread or protected by a locking mechanism |
| | | * before being called. |
| | | * |
| | |
| | | } |
| | | } |
| | | |
| | | // Now if it is allowed, compute the best replication server to see if |
| | | // it is still the one we are currently connected to. If not, |
| | | // disconnect properly and let the connection algorithm re-connect to |
| | | // best replication server |
| | | /* |
| | | Now if it is allowed, compute the best replication server to see if |
| | | it is still the one we are currently connected to. If not, |
| | | disconnect properly and let the connection algorithm re-connect to |
| | | best replication server |
| | | */ |
| | | if (reconnectToTheBestRS) |
| | | { |
| | | mustRunBestServerCheckingAlgorithm++; |
| | |
| | | // best server checking. |
| | | ReplicationServerInfo bestServerInfo = |
| | | computeBestReplicationServer(false, rsServerId, state, |
| | | replicationServerInfos, serverId, baseDn, groupId, |
| | | replicationServerInfos, serverId, groupId, |
| | | generationID); |
| | | |
| | | if ((rsServerId != -1) && ((bestServerInfo == null) || |
| | |
| | | |
| | | if (rsServerId == rsId) |
| | | { |
| | | // If we are computing connected DSs for the RS we are connected |
| | | // to, we should count the local DS as the DSInfo of the local DS is not |
| | | // sent by the replication server in the topology message. We must count |
| | | // ourself as a connected server. |
| | | /* |
| | | If we are computing connected DSs for the RS we are connected |
| | | to, we should count the local DS as the DSInfo of the local DS is not |
| | | sent by the replication server in the topology message. We must count |
| | | ourselves as a connected server. |
| | | */ |
| | | connectedDSs.add(serverId); |
| | | } |
| | | |