| | |
| | | * out which one is the best to connect to. |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("serverId: " + serverId |
| | | + " phase 1 : will perform PhaseOneH with each RS in " |
| | | + " order to elect the preferred one"); |
| | | debugInfo("phase 1 : will perform PhaseOneH with each RS in order to " |
| | | + "elect the preferred one"); |
| | | |
| | | // Get info from every available replication servers |
| | | replicationServerInfos = collectReplicationServersInfo(); |
| | |
| | | if (replicationServerInfos.size() > 0) |
| | | { |
| | | // At least one server answered, find the best one. |
| | | electedRsInfo = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, getGroupId(), getGenerationID()); |
| | | RSEvaluations evals = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, getGroupId(), getGenerationID()); |
| | | electedRsInfo = evals.getBestRS(); |
| | | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("serverId: " + serverId |
| | | + " phase 2 : will perform PhaseOneH with the preferred RS=" |
| | | + electedRsInfo); |
| | | debugInfo("phase 2 : will perform PhaseOneH with the preferred RS=" |
| | | + electedRsInfo); |
| | | electedRsInfo = performPhaseOneHandshake( |
| | | electedRsInfo.getServerURL(), true, false); |
| | | |
| | |
| | | int nChanges = ServerState.diffChanges(rsState, state); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("RB for dn " + getBaseDN() + " and with server id " |
| | | + getServerId() + " computed " + nChanges + " changes late."); |
| | | debugInfo("computed " + nChanges + " changes late."); |
| | | } |
| | | |
| | | /* |
| | |
| | | ReplicationMsg msg = localSession.receive(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n" |
| | | + serverStartMsg + "\nAND RECEIVED:\n" + msg); |
| | | debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n" |
| | | + msg); |
| | | } |
| | | |
| | | // Wrap received message in a server info object |
| | |
| | | // FIXME ECL In the handshake phase two, should RS send back a topo msg ? |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n" |
| | | + startECLSessionMsg); |
| | | TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg); |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n" |
| | | + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg); |
| | | TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg |
| | | + "\nAND RECEIVED:\n" + topologyMsg); |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | |
| | | } |
| | | |
| | | /** |
| | | * Class holding evaluation results for electing the best replication server |
| | | * for the local directory server. |
| | | */ |
| | | static class RSEvaluations |
| | | { |
| | | |
| | | private final int localServerId; |
| | | private Map<Integer, ReplicationServerInfo> bestRSs; |
| | | private final Map<Integer, Message> rsEvals = |
| | | new HashMap<Integer, Message>(); |
| | | |
| | | /** |
| | | * Ctor. |
| | | * |
| | | * @param localServerId |
| | | * the serverId for the local directory server |
| | | * @param rsInfos |
| | | * a Map of serverId => {@link ReplicationServerInfo} with all the |
| | | * candidate replication servers |
| | | */ |
| | | RSEvaluations(int localServerId, |
| | | Map<Integer, ReplicationServerInfo> rsInfos) |
| | | { |
| | | this.localServerId = localServerId; |
| | | this.bestRSs = rsInfos; |
| | | } |
| | | |
| | | private boolean keepBest(LocalEvaluation eval) |
| | | { |
| | | if (eval.hasAcceptedAny()) |
| | | { |
| | | bestRSs = eval.getAccepted(); |
| | | rsEvals.putAll(eval.getRejected()); |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Sets the elected best replication server, rejecting all the other |
| | | * replication servers with the supplied evaluation. |
| | | * |
| | | * @param bestRsId |
| | | * the serverId of the elected replication server |
| | | * @param rejectedRSsEval |
| | | * the evaluation for all the rejected replication servers |
| | | */ |
| | | private void setBestRS(int bestRsId, Message rejectedRSsEval) |
| | | { |
| | | for (Iterator<Entry<Integer, ReplicationServerInfo>> it = |
| | | this.bestRSs.entrySet().iterator(); it.hasNext();) |
| | | { |
| | | final Entry<Integer, ReplicationServerInfo> entry = it.next(); |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.getServerId() != bestRsId) |
| | | { |
| | | it.remove(); |
| | | } |
| | | rsEvals.put(rsId, rejectedRSsEval); |
| | | } |
| | | } |
| | | |
| | | private void discardAll(Message eval) |
| | | { |
| | | for (Integer rsId : bestRSs.keySet()) |
| | | { |
| | | rsEvals.put(rsId, eval); |
| | | } |
| | | } |
| | | |
| | | private boolean foundBestRS() |
| | | { |
| | | return bestRSs.size() == 1; |
| | | } |
| | | |
| | | /** |
| | | * Returns the {@link ReplicationServerInfo} for the best replication |
| | | * server. |
| | | * |
| | | * @return the {@link ReplicationServerInfo} for the best replication server |
| | | */ |
| | | ReplicationServerInfo getBestRS() |
| | | { |
| | | if (foundBestRS()) |
| | | { |
| | | return bestRSs.values().iterator().next(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Returns the evaluations for all the candidate replication servers. |
| | | * |
| | | * @return a Map of serverId => Message containing the evaluation for each |
| | | * candidate replication servers. |
| | | */ |
| | | Map<Integer, Message> getEvaluations() |
| | | { |
| | | if (foundBestRS()) |
| | | { |
| | | final Integer bestRSServerId = getBestRS().getServerId(); |
| | | if (rsEvals.get(bestRSServerId) == null) |
| | | { |
| | | final Message eval = NOTE_BEST_RS.get(bestRSServerId, localServerId); |
| | | rsEvals.put(bestRSServerId, eval); |
| | | } |
| | | } |
| | | return Collections.unmodifiableMap(rsEvals); |
| | | } |
| | | |
| | | /** |
| | | * Returns the evaluation for the supplied replication server Id. |
| | | * <p> |
| | | * Note: "unknown RS" message is returned if the supplied replication server |
| | | * was not part of the candidate replication servers. |
| | | * |
| | | * @param rsServerId |
| | | * the supplied replication server Id |
| | | * @return the evaluation {@link Message} for the supplied replication |
| | | * server Id |
| | | */ |
| | | private Message getEvaluation(int rsServerId) |
| | | { |
| | | final Message evaluation = getEvaluations().get(rsServerId); |
| | | if (evaluation != null) |
| | | { |
| | | return evaluation; |
| | | } |
| | | return NOTE_UNKNOWN_RS.get(rsServerId, localServerId); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return new StringBuilder() |
| | | .append("Current best replication server Ids: ").append( |
| | | this.bestRSs.keySet()).append( |
| | | ", Evaluation of connected replication servers").append( |
| | | " (ServerId => Evaluation): ").append(this.rsEvals.keySet()) |
| | | .append(", Any replication server not appearing here").append( |
| | | " could not be contacted.").toString(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Evaluation local to one filter. |
| | | */ |
| | | private static class LocalEvaluation |
| | | { |
| | | private final Map<Integer, ReplicationServerInfo> filteredRSs = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | private final Map<ReplicationServerInfo, Message> rsEvals = |
| | | new HashMap<ReplicationServerInfo, Message>(); |
| | | |
| | | private void accept(Integer rsId, ReplicationServerInfo rsInfo) |
| | | { |
| | | this.rsEvals.remove(rsInfo); // undo reject |
| | | this.filteredRSs.put(rsId, rsInfo); |
| | | } |
| | | |
| | | private void reject(ReplicationServerInfo rsInfo, Message reason) |
| | | { |
| | | this.filteredRSs.remove(rsInfo.getServerId()); // undo accept |
| | | this.rsEvals.put(rsInfo, reason); |
| | | } |
| | | |
| | | private Map<Integer, ReplicationServerInfo> getAccepted() |
| | | { |
| | | return filteredRSs; |
| | | } |
| | | |
| | | public Map<Integer, Message> getRejected() |
| | | { |
| | | final Map<Integer, Message> result = new HashMap<Integer, Message>(); |
| | | for (Entry<ReplicationServerInfo, Message> entry : rsEvals.entrySet()) |
| | | { |
| | | result.put(entry.getKey().getServerId(), entry.getValue()); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | private boolean hasAcceptedAny() |
| | | { |
| | | return !filteredRSs.isEmpty(); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Returns the replication server that best fits our need so that we can |
| | | * connect to it or determine if we must disconnect from current one to |
| | | * re-connect to best server. |
| | | * |
| | | * <p> |
| | | * Note: this method is static for test purpose (access from unit tests) |
| | | * |
| | | * |
| | | * @param firstConnection True if we run this method for the very first |
| | | * connection of the broker. False if we run this method to determine if the |
| | | * replication server we are currently connected to is still the best or not. |
| | |
| | | * disconnect (so the best replication server is another one than the current |
| | | * one). Null can only be returned when firstConnection is false. |
| | | */ |
| | | public static ReplicationServerInfo computeBestReplicationServer( |
| | | public static RSEvaluations computeBestReplicationServer( |
| | | boolean firstConnection, int rsServerId, ServerState myState, |
| | | Map<Integer, ReplicationServerInfo> rsInfos, int localServerId, |
| | | byte groupId, long generationId) |
| | | { |
| | | final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos); |
| | | // Shortcut, if only one server, this is the best |
| | | if (rsInfos.size() == 1) |
| | | if (evals.foundBestRS()) |
| | | { |
| | | return rsInfos.values().iterator().next(); |
| | | return evals; |
| | | } |
| | | |
| | | /** |
| | |
| | | * local DS |
| | | * - replication server in the same VM as local DS one |
| | | */ |
| | | // TODO JNR log why an RS was evicted as best server |
| | | Map<Integer, ReplicationServerInfo> bestServers = rsInfos; |
| | | /* |
| | | The list of best replication servers is filtered with each criteria. At |
| | | each criteria, the list is replaced with the filtered one if there |
| | |
| | | the local configuration. When the current method is called, for |
| | | sure, at least one server from the list is locally configured |
| | | */ |
| | | bestServers = |
| | | keepBest(filterServersLocallyConfigured(bestServers), bestServers); |
| | | filterServersLocallyConfigured(evals, localServerId); |
| | | // Some servers with same group id ? |
| | | bestServers = |
| | | keepBest(filterServersWithSameGroupId(bestServers, groupId), |
| | | bestServers); |
| | | filterServersWithSameGroupId(evals, localServerId, groupId); |
| | | // Some servers with same generation id ? |
| | | Map<Integer, ReplicationServerInfo> sameGenerationId = |
| | | filterServersWithSameGenerationId(bestServers, generationId); |
| | | if (sameGenerationId.size() > 0) |
| | | final boolean rssWithSameGenerationIdExist = |
| | | filterServersWithSameGenerationId(evals, localServerId, generationId); |
| | | if (rssWithSameGenerationIdExist) |
| | | { |
| | | // If some servers with the right generation id this is useful to |
| | | // run the local DS change criteria |
| | | bestServers = |
| | | keepBest(filterServersWithAllLocalDSChanges(sameGenerationId, |
| | | myState, localServerId), sameGenerationId); |
| | | filterServersWithAllLocalDSChanges(evals, myState, localServerId); |
| | | } |
| | | // Some servers in the local VM or local host? |
| | | bestServers = keepBest(filterServersOnSameHost(bestServers), bestServers); |
| | | filterServersOnSameHost(evals, localServerId); |
| | | |
| | | /** |
| | | * Now apply the choice base on the weight to the best servers list |
| | | */ |
| | | if (bestServers.size() == 1) |
| | | if (evals.foundBestRS()) |
| | | { |
| | | return bestServers.values().iterator().next(); |
| | | return evals; |
| | | } |
| | | |
| | | /** |
| | | * Now apply the choice based on the weight to the best servers list |
| | | */ |
| | | if (firstConnection) |
| | | { |
| | | // We are not connected to a server yet |
| | | return computeBestServerForWeight(bestServers, -1, -1); |
| | | computeBestServerForWeight(evals, -1, -1); |
| | | } |
| | | /* |
| | | * We are already connected to a RS: compute the best RS as far as the |
| | | * weights is concerned. If this is another one, some DS must disconnect. |
| | | */ |
| | | return computeBestServerForWeight(bestServers, rsServerId, localServerId); |
| | | } |
| | | |
| | | /** |
| | | * If the filtered Map is not empty then it is returned, else return the |
| | | * original unfiltered Map. |
| | | * |
| | | * @return the best fit Map between the filtered Map and the original |
| | | * unfiltered Map. |
| | | */ |
| | | private static <K, V> Map<K, V> keepBest(Map<K, V> filteredMap, |
| | | Map<K, V> unfilteredMap) |
| | | { |
| | | if (!filteredMap.isEmpty()) |
| | | else |
| | | { |
| | | return filteredMap; |
| | | /* |
| | | * We are already connected to a RS: compute the best RS as far as the |
| | | * weights is concerned. If this is another one, some DS must disconnect. |
| | | */ |
| | | computeBestServerForWeight(evals, rsServerId, localServerId); |
| | | } |
| | | return unfilteredMap; |
| | | return evals; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new list that contains only replication servers that are locally |
| | | * configured. |
| | | * @param bestServers The list of replication servers to filter |
| | | * @return The sub list of replication servers locally configured |
| | | * @param evals The evaluation object |
| | | */ |
| | | private static Map<Integer, ReplicationServerInfo> |
| | | filterServersLocallyConfigured(Map<Integer, |
| | | ReplicationServerInfo> bestServers) |
| | | private static void filterServersLocallyConfigured(RSEvaluations evals, |
| | | int localServerId) |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | final LocalEvaluation eval = new LocalEvaluation(); |
| | | for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) |
| | | { |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.isLocallyConfigured()) |
| | | { |
| | | result.put(entry.getKey(), rsInfo); |
| | | eval.accept(rsId, rsInfo); |
| | | } |
| | | else |
| | | { |
| | | eval.reject(rsInfo, |
| | | NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId)); |
| | | } |
| | | } |
| | | return result; |
| | | evals.keepBest(eval); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new list that contains only replication servers that have the |
| | | * passed group id, from a passed replication server list. |
| | | * @param bestServers The list of replication servers to filter |
| | | * @param evals The evaluation object |
| | | * @param groupId The group id that must match |
| | | * @return The sub list of replication servers matching the requested group id |
| | | * (which may be empty) |
| | | */ |
| | | private static Map<Integer, ReplicationServerInfo> |
| | | filterServersWithSameGroupId(Map<Integer, |
| | | ReplicationServerInfo> bestServers, byte groupId) |
| | | private static void filterServersWithSameGroupId(RSEvaluations evals, |
| | | int localServerId, byte groupId) |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | final LocalEvaluation eval = new LocalEvaluation(); |
| | | for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) |
| | | { |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.getGroupId() == groupId) |
| | | { |
| | | result.put(entry.getKey(), rsInfo); |
| | | eval.accept(rsId, rsInfo); |
| | | } |
| | | else |
| | | { |
| | | eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get( |
| | | rsId, rsInfo.getGroupId(), localServerId, groupId)); |
| | | } |
| | | } |
| | | return result; |
| | | evals.keepBest(eval); |
| | | } |
| | | |
| | | /** |
| | |
| | | * then the 'empty'(generationId==-1) replication servers are also included |
| | | * in the result list. |
| | | * |
| | | * @param bestServers The list of replication servers to filter |
| | | * @param evals The evaluation object |
| | | * @param generationId The generation id that must match |
| | | * @return The sub list of replication servers matching the requested |
| | | * generation id (which may be empty) |
| | | * @return whether some replication server passed the filter |
| | | */ |
| | | private static Map<Integer, ReplicationServerInfo> |
| | | filterServersWithSameGenerationId(Map<Integer, |
| | | ReplicationServerInfo> bestServers, long generationId) |
| | | private static boolean filterServersWithSameGenerationId( |
| | | RSEvaluations evals, long localServerId, long generationId) |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; |
| | | final LocalEvaluation eval = new LocalEvaluation(); |
| | | boolean emptyState = true; |
| | | |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.getGenerationId() == generationId) |
| | | { |
| | | result.put(entry.getKey(), rsInfo); |
| | | eval.accept(rsId, rsInfo); |
| | | if (!rsInfo.serverState.isEmpty()) |
| | | emptyState = false; |
| | | } |
| | | else if (rsInfo.getGenerationId() == -1) |
| | | { |
| | | eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId, |
| | | generationId, localServerId)); |
| | | } |
| | | else |
| | | { |
| | | eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get( |
| | | rsId, rsInfo.getGenerationId(), localServerId, generationId)); |
| | | } |
| | | } |
| | | |
| | | if (emptyState) |
| | |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.getGenerationId() == -1) |
| | | { |
| | | result.put(entry.getKey(), rsInfo); |
| | | // will undo the reject of previously rejected RSs |
| | | eval.accept(entry.getKey(), rsInfo); |
| | | } |
| | | } |
| | | } |
| | | return result; |
| | | |
| | | return evals.keepBest(eval); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new list that contains only replication servers that have the |
| | | * latest changes from the passed DS, from a passed replication server list. |
| | | * @param bestServers The list of replication servers to filter |
| | | * @param evals The evaluation object |
| | | * @param localState The state of the local DS |
| | | * @param localServerId The server id to consider for the changes |
| | | * @return The sub list of replication servers that have the latest changes |
| | | * from the passed DS (which may be empty) |
| | | */ |
| | | private static Map<Integer, ReplicationServerInfo> |
| | | filterServersWithAllLocalDSChanges(Map<Integer, |
| | | ReplicationServerInfo> bestServers, ServerState localState, |
| | | int localServerId) |
| | | private static void filterServersWithAllLocalDSChanges( |
| | | RSEvaluations evals, ServerState localState, int localServerId) |
| | | { |
| | | Map<Integer, ReplicationServerInfo> upToDateServers = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | Map<Integer, ReplicationServerInfo> moreUpToDateServers = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | // Extract the CSN of the latest change generated by the local server |
| | | CSN myCSN = localState.getCSN(localServerId); |
| | | if (myCSN == null) |
| | | { |
| | | myCSN = new CSN(0, 0, localServerId); |
| | | } |
| | | final CSN localCSN = getCSN(localState, localServerId); |
| | | |
| | | /** |
| | | * Find replication servers that are up to date (or more up to date than us, |
| | |
| | | * server id. If some servers are more up to date, prefer this list but take |
| | | * only the latest CSN. |
| | | */ |
| | | final LocalEvaluation mostUpToDateEval = new LocalEvaluation(); |
| | | boolean foundRSMoreUpToDateThanLocalDS = false; |
| | | CSN latestRsCSN = null; |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) |
| | | { |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | CSN rsCSN = rsInfo.getServerState().getCSN(localServerId); |
| | | if (rsCSN == null) |
| | | { |
| | | rsCSN = new CSN(0, 0, localServerId); |
| | | } |
| | | final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId); |
| | | |
| | | // Has this replication server the latest local change ? |
| | | if (myCSN.isOlderThanOrEqualTo(rsCSN)) |
| | | if (rsCSN.isOlderThan(localCSN)) |
| | | { |
| | | if (myCSN.equals(rsCSN)) |
| | | mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get( |
| | | rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); |
| | | } |
| | | else if (rsCSN.equals(localCSN)) |
| | | { |
| | | // This replication server has exactly the latest change from the |
| | | // local server |
| | | if (!foundRSMoreUpToDateThanLocalDS) |
| | | { |
| | | // This replication server has exactly the latest change from the |
| | | // local server |
| | | upToDateServers.put(rsId, rsInfo); |
| | | } else |
| | | mostUpToDateEval.accept(rsId, rsInfo); |
| | | } |
| | | else |
| | | { |
| | | // This replication server is even more up to date than the local |
| | | // server |
| | | if (latestRsCSN == null) |
| | | { |
| | | // Initialize the latest CSN |
| | | latestRsCSN = rsCSN; |
| | | } |
| | | if (rsCSN.isNewerThanOrEqualTo(latestRsCSN)) |
| | | { |
| | | if (rsCSN.equals(latestRsCSN)) |
| | | { |
| | | moreUpToDateServers.put(rsId, rsInfo); |
| | | } else |
| | | { |
| | | // This RS is even more up to date, clear the list and store this |
| | | // new RS |
| | | moreUpToDateServers.clear(); |
| | | moreUpToDateServers.put(rsId, rsInfo); |
| | | latestRsCSN = rsCSN; |
| | | } |
| | | } |
| | | mostUpToDateEval.reject(rsInfo, |
| | | NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( |
| | | rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); |
| | | } |
| | | } |
| | | else if (rsCSN.isNewerThan(localCSN)) |
| | | { |
| | | // This replication server is even more up to date than the local server |
| | | if (latestRsCSN == null) |
| | | { |
| | | foundRSMoreUpToDateThanLocalDS = true; |
| | | // all previous results are now outdated, reject them all |
| | | rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId, |
| | | localCSN); |
| | | // Initialize the latest CSN |
| | | latestRsCSN = rsCSN; |
| | | } |
| | | |
| | | if (rsCSN.equals(latestRsCSN)) |
| | | { |
| | | mostUpToDateEval.accept(rsId, rsInfo); |
| | | } |
| | | else if (rsCSN.isNewerThan(latestRsCSN)) |
| | | { |
| | | // This RS is even more up to date, reject all previously accepted RSs |
| | | // and store this new RS |
| | | rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId, |
| | | localCSN); |
| | | mostUpToDateEval.accept(rsId, rsInfo); |
| | | latestRsCSN = rsCSN; |
| | | } |
| | | else |
| | | { |
| | | mostUpToDateEval.reject(rsInfo, |
| | | NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( |
| | | rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); |
| | | } |
| | | } |
| | | } |
| | | if (moreUpToDateServers.size() > 0) |
| | | { |
| | | // Prefer servers more up to date than local server |
| | | return moreUpToDateServers; |
| | | } |
| | | return upToDateServers; |
| | | evals.keepBest(mostUpToDateEval); |
| | | } |
| | | |
| | | private static CSN getCSN(ServerState localState, int localServerId) |
| | | { |
| | | final CSN csn = localState.getCSN(localServerId); |
| | | if (csn != null) |
| | | { |
| | | return csn; |
| | | } |
| | | return new CSN(0, 0, localServerId); |
| | | } |
| | | |
| | | private static void rejectAllWithRSIsLaterThanBestRS( |
| | | final LocalEvaluation eval, int localServerId, CSN localCSN) |
| | | { |
| | | for (ReplicationServerInfo rsInfo : eval.getAccepted().values()) |
| | | { |
| | | final String rsCSN = |
| | | getCSN(rsInfo.getServerState(), localServerId).toStringUI(); |
| | | final Message reason = |
| | | NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( |
| | | rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI()); |
| | | eval.reject(rsInfo, reason); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new list that contains only replication servers that are on the |
| | |
| | | * method will gives priority to any replication server which is in the same |
| | | * VM as this DS. |
| | | * |
| | | * @param bestServers |
| | | * The list of replication servers to filter |
| | | * @return The sub list of replication servers being on the same host as the |
| | | * local DS (which may be empty) |
| | | * @param evals The evaluation object |
| | | */ |
| | | private static Map<Integer, ReplicationServerInfo> filterServersOnSameHost( |
| | | Map<Integer, ReplicationServerInfo> bestServers) |
| | | private static void filterServersOnSameHost(RSEvaluations evals, |
| | | int localServerId) |
| | | { |
| | | /* |
| | | * Initially look for all servers on the same host. If we find one in the |
| | | * same VM, then narrow the search. |
| | | */ |
| | | boolean foundRSInSameVM = false; |
| | | final Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | final LocalEvaluation eval = new LocalEvaluation(); |
| | | for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) |
| | | { |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | |
| | | { |
| | | // An RS in the same VM will always have priority. |
| | | // Narrow the search to only include servers in this VM. |
| | | result.clear(); |
| | | rejectAllWithRSOnDifferentVMThanDS(eval, localServerId); |
| | | foundRSInSameVM = true; |
| | | } |
| | | result.put(rsId, rsInfo); |
| | | eval.accept(rsId, rsInfo); |
| | | } |
| | | else if (!foundRSInSameVM) |
| | | { |
| | | // OK, accept RSs on the same machine because we have not found an RS |
| | | // in the same VM yet |
| | | result.put(rsId, rsInfo); |
| | | eval.accept(rsId, rsInfo); |
| | | } |
| | | else |
| | | { |
| | | // Skip: we have found some RSs in the same VM, but this RS is not. |
| | | eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId, |
| | | localServerId)); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId, |
| | | localServerId)); |
| | | } |
| | | } |
| | | return result; |
| | | evals.keepBest(eval); |
| | | } |
| | | |
| | | private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval, |
| | | int localServerId) |
| | | { |
| | | for (ReplicationServerInfo rsInfo : eval.getAccepted().values()) |
| | | { |
| | | eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get( |
| | | rsInfo.getServerId(), localServerId)); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | * Warning: This method is expected to be called with at least 2 servers in |
| | | * bestServers |
| | | * Note: this method is static for test purpose (access from unit tests) |
| | | * @param bestServers The list of replication servers to consider |
| | | * @param evals The evaluation object |
| | | * @param currentRsServerId The replication server the local server is |
| | | * currently connected to. -1 if the local server is not yet connected |
| | | * to any replication server. |
| | | * @param localServerId The server id of the local server. This is not used |
| | | * when it is not connected to a replication server |
| | | * (currentRsServerId = -1) |
| | | * @return The replication server the local server should be connected to |
| | | * as far as the weight is concerned. This may be the currently used one if |
| | | * the weight is correctly spread. If the returned value is null, the best |
| | | * replication server is undetermined but the local server must disconnect |
| | | * (so the best replication server is another one than the current one). |
| | | */ |
| | | public static ReplicationServerInfo computeBestServerForWeight( |
| | | Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId, |
| | | int localServerId) |
| | | public static void computeBestServerForWeight(RSEvaluations evals, |
| | | int currentRsServerId, int localServerId) |
| | | { |
| | | final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; |
| | | /* |
| | | * - Compute the load goal of each RS, deducing it from the weights affected |
| | | * to them. |
| | |
| | | { |
| | | // The local server is not connected yet, find best server to connect to, |
| | | // taking the weights into account. |
| | | return computeBestServerWhenNotConnected(bestServers, loadDistances); |
| | | computeBestServerWhenNotConnected(evals, loadDistances, localServerId); |
| | | } |
| | | // The local server is currently connected to a RS, let's see if it must |
| | | // disconnect or not, taking the weights into account. |
| | | return computeBestServerWhenConnected(bestServers, loadDistances, |
| | | localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs); |
| | | else |
| | | { |
| | | // The local server is currently connected to a RS, let's see if it must |
| | | // disconnect or not, taking the weights into account. |
| | | computeBestServerWhenConnected(evals, loadDistances, localServerId, |
| | | currentRsServerId, sumOfWeights, sumOfConnectedDSs); |
| | | } |
| | | } |
| | | |
| | | private static ReplicationServerInfo computeBestServerWhenNotConnected( |
| | | Map<Integer, ReplicationServerInfo> bestServers, |
| | | Map<Integer, BigDecimal> loadDistances) |
| | | private static void computeBestServerWhenNotConnected(RSEvaluations evals, |
| | | Map<Integer, BigDecimal> loadDistances, int localServerId) |
| | | { |
| | | final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; |
| | | /* |
| | | * Find the server with the current highest distance to its load goal and |
| | | * choose it. Make an exception if every server is correctly balanced, |
| | |
| | | // Choose server with the highest weight |
| | | bestRsId = highestWeightRsId; |
| | | } |
| | | return bestServers.get(bestRsId); |
| | | evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId, |
| | | bestRsId)); |
| | | } |
| | | |
| | | private static ReplicationServerInfo computeBestServerWhenConnected( |
| | | Map<Integer, ReplicationServerInfo> bestServers, |
| | | private static void computeBestServerWhenConnected(RSEvaluations evals, |
| | | Map<Integer, BigDecimal> loadDistances, int localServerId, |
| | | int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs) |
| | | { |
| | | final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; |
| | | final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); |
| | | float currentLoadDistance = |
| | | loadDistances.get(currentRsServerId).floatValue(); |
| | |
| | | { |
| | | // Avoid the yoyo effect, and keep the local DS connected to its |
| | | // current RS |
| | | return bestServers.get(currentRsServerId); |
| | | evals.setBestRS(currentRsServerId, |
| | | NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId)); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // Prepare a sorted list (from lowest to highest) or DS server ids |
| | | // connected to the current RS |
| | | ReplicationServerInfo currentRsInfo = |
| | | bestServers.get(currentRsServerId); |
| | | List<Integer> serversConnectedToCurrentRS = |
| | | new ArrayList<Integer>(currentRsInfo.getConnectedDSs()); |
| | | Collections.sort(serversConnectedToCurrentRS); |
| | | |
| | | // Go through the list of DSs to disconnect and see if the local |
| | | // server is part of them. |
| | | int index = 0; |
| | | while (overloadingDSsNumber > 0) |
| | | bestServers.get(currentRsServerId); |
| | | if (isServerOverloadingRS(localServerId, currentRsInfo, |
| | | overloadingDSsNumber)) |
| | | { |
| | | int serverIdToDisconnect = serversConnectedToCurrentRS.get(index); |
| | | if (serverIdToDisconnect == localServerId) |
| | | { |
| | | // The local server is part of the DSs to disconnect |
| | | return null; |
| | | } |
| | | overloadingDSsNumber--; |
| | | index++; |
| | | // The local server is part of the DSs to disconnect |
| | | evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get( |
| | | localServerId, currentRsServerId)); |
| | | } |
| | | |
| | | // The local server is not part of the servers to disconnect from the |
| | | // current RS. |
| | | else |
| | | { |
| | | // The local server is not part of the servers to disconnect from the |
| | | // current RS. |
| | | evals.setBestRS(currentRsServerId, |
| | | NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId, |
| | | currentRsServerId)); |
| | | } |
| | | } else { |
| | | // The average distance of the other RSs does not show a lack of DSs: |
| | | // no need to disconnect any DS from the current RS. |
| | | evals.setBestRS(currentRsServerId, |
| | | NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId, |
| | | currentRsServerId)); |
| | | } |
| | | } else { |
| | | // The RS load goal is reached or there are not enough DSs connected to |
| | | // it to reach it: do not disconnect from this RS and return rsInfo for |
| | | // this RS |
| | | evals.setBestRS(currentRsServerId, |
| | | NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId, |
| | | currentRsServerId)); |
| | | } |
| | | return bestServers.get(currentRsServerId); |
| | | } |
| | | |
| | | /** |
| | | * Returns whether the local DS is overloading the RS. |
| | | * <p> |
| | | * There are an "overloadingDSsNumber" of DS overloading the RS. The list of |
| | | * DSs connected to this RS is ordered by serverId to use a consistent |
| | | * ordering across all nodes in the topology. The serverIds which index in the |
| | | * List are lower than "overloadingDSsNumber" will be evicted first. |
| | | * <p> |
| | | * This ordering is unfair since nodes with the lower serverIds will be |
| | | * evicted more often than nodes with higher serverIds. However, it is a |
| | | * consistent and reliable ordering applicable anywhere in the topology. |
| | | */ |
| | | private static boolean isServerOverloadingRS(int localServerId, |
| | | ReplicationServerInfo currentRsInfo, int overloadingDSsNumber) |
| | | { |
| | | List<Integer> serversConnectedToCurrentRS = |
| | | new ArrayList<Integer>(currentRsInfo.getConnectedDSs()); |
| | | Collections.sort(serversConnectedToCurrentRS); |
| | | |
| | | final int idx = serversConnectedToCurrentRS.indexOf(localServerId); |
| | | return idx != -1 && idx < overloadingDSsNumber; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(this + " end restart : connected=" + connected |
| | | + " with RSid=" + getRsServerId() + " genid=" + this.generationID); |
| | | debugInfo("end restart : connected=" + connected + " with RS(" |
| | | + getRsServerId() + ") genId=" + this.generationID); |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("ReplicationBroker.publish() Publishing a " |
| | | + "message is not possible due to existing connection error."); |
| | | debugInfo("publish(): Publishing a message is not possible due to" |
| | | + " existing connection error."); |
| | | } |
| | | |
| | | return false; |
| | |
| | | // ignore |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("ReplicationBroker.publish() " |
| | | + "Interrupted exception raised : " + e.getLocalizedMessage()); |
| | | debugInfo("publish(): Interrupted exception raised : " |
| | | + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | } |
| | |
| | | // just loop. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("ReplicationBroker.publish() " |
| | | + "Interrupted exception raised." + e.getLocalizedMessage()); |
| | | debugInfo("publish(): Interrupted exception raised." |
| | | + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | // Stable topology (no topo msg since few seconds): proceed with |
| | | // best server checking. |
| | | final ReplicationServerInfo bestServerInfo = |
| | | final RSEvaluations evals = |
| | | computeBestReplicationServer(false, previousRsServerID, state, |
| | | replicationServerInfos, serverId, getGroupId(), |
| | | generationID); |
| | | replicationServerInfos, serverId, getGroupId(), generationID); |
| | | final ReplicationServerInfo bestServerInfo = evals.getBestRS(); |
| | | if (previousRsServerID != -1 |
| | | && (bestServerInfo == null |
| | | || bestServerInfo.getServerId() != previousRsServerID)) |
| | |
| | | } |
| | | else |
| | | { |
| | | // TODO JNR log why an RS was evicted as best server |
| | | final int bestRsServerId = bestServerInfo.getServerId(); |
| | | message = NOTE_NEW_BEST_REPLICATION_SERVER.get( |
| | | serverId, previousRsServerID, |
| | | localSession.getReadableRemoteAddress(), |
| | | bestServerInfo.getServerId(), |
| | | baseDN.toNormalizedString()); |
| | | bestRsServerId, |
| | | baseDN.toNormalizedString(), |
| | | evals.getEvaluation(previousRsServerID).toString(), |
| | | evals.getEvaluation(bestRsServerId).toString()); |
| | | } |
| | | logError(message); |
| | | if (debugEnabled()) |
| | | debugInfo("best replication servers evaluation results: " |
| | | + evals); |
| | | reStart(true); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Gets the States of all the Replicas currently in the |
| | | * Topology. |
| | | * When this method is called, a Monitoring message will be sent |
| | | * to the Replication Server to which this domain is currently connected |
| | | * so that it computes a table containing information about |
| | | * all Directory Servers in the topology. |
| | | * This Computation involves communications will all the servers |
| | | * currently connected and |
| | | * Gets the States of all the Replicas currently in the Topology. When this |
| | | * method is called, a Monitoring message will be sent to the Replication |
| | | * Server to which this domain is currently connected so that it computes a |
| | | * table containing information about all Directory Servers in the topology. |
| | | * This Computation involves communications will all the servers currently |
| | | * connected and |
| | | * |
| | | * @return The States of all Replicas in the topology (except us) |
| | | */ |
| | |
| | | public void stop() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("ReplicationBroker " + getServerId() + " is stopping" |
| | | + " and will close the connection to replication server " |
| | | + rsServerId + " for domain " + getBaseDN()); |
| | | debugInfo("is stopping and will close the connection to" |
| | | + " replication server " + rsServerId); |
| | | |
| | | synchronized (startStopLock) |
| | | { |
| | |
| | | public void receiveTopo(TopologyMsg topoMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg); |
| | | debugInfo("receive TopologyMsg=" + topoMsg); |
| | | |
| | | // Store new DS list |
| | | dsList = topoMsg.getDsList(); |
| | |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this |
| | | + " is not configured to send CSN heartbeat interval"); |
| | | debugInfo("is not configured to send CSN heartbeat interval"); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | return sb.toString(); |
| | | } |
| | | |
| | | private void debugInfo(String message) |
| | | { |
| | | TRACER.debugInfo(getClass().getSimpleName() + " for baseDN=" + getBaseDN() |
| | | + " and serverId=" + getServerId() + " " + message); |
| | | } |
| | | } |