From 5c3362e4dd1ee79c10160cfd128b70cdb9a2dee1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 28 Oct 2013 09:06:07 +0000
Subject: [PATCH] OPENDJ-1053 (CR-2520) Improve logging of replication load balancing and fail-over in order to diagnose the causes of these events
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 722 +++++++++++++++++++++++++++++++++++++------------------
1 files changed, 485 insertions(+), 237 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 1072f55..e755741 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -806,9 +806,8 @@
* out which one is the best to connect to.
*/
if (debugEnabled())
- TRACER.debugInfo("serverId: " + serverId
- + " phase 1 : will perform PhaseOneH with each RS in "
- + " order to elect the preferred one");
+ debugInfo("phase 1 : will perform PhaseOneH with each RS in order to "
+ + "elect the preferred one");
// Get info from every available replication servers
replicationServerInfos = collectReplicationServersInfo();
@@ -818,14 +817,14 @@
if (replicationServerInfos.size() > 0)
{
// At least one server answered, find the best one.
- electedRsInfo = computeBestReplicationServer(true, -1, state,
- replicationServerInfos, serverId, getGroupId(), getGenerationID());
+ RSEvaluations evals = computeBestReplicationServer(true, -1, state,
+ replicationServerInfos, serverId, getGroupId(), getGenerationID());
+ electedRsInfo = evals.getBestRS();
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
- TRACER.debugInfo("serverId: " + serverId
- + " phase 2 : will perform PhaseOneH with the preferred RS="
- + electedRsInfo);
+ debugInfo("phase 2 : will perform PhaseOneH with the preferred RS="
+ + electedRsInfo);
electedRsInfo = performPhaseOneHandshake(
electedRsInfo.getServerURL(), true, false);
@@ -1056,8 +1055,7 @@
int nChanges = ServerState.diffChanges(rsState, state);
if (debugEnabled())
{
- TRACER.debugInfo("RB for dn " + getBaseDN() + " and with server id "
- + getServerId() + " computed " + nChanges + " changes late.");
+ debugInfo("computed " + nChanges + " changes late.");
}
/*
@@ -1136,8 +1134,8 @@
ReplicationMsg msg = localSession.receive();
if (debugEnabled())
{
- TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
- + serverStartMsg + "\nAND RECEIVED:\n" + msg);
+ debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
+ + msg);
}
// Wrap received message in a server info object
@@ -1253,8 +1251,7 @@
// FIXME ECL In the handshake phase two, should RS send back a topo msg ?
if (debugEnabled())
{
- TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
- + startECLSessionMsg);
+ TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
}
// Alright set the timeout to the desired value
@@ -1320,8 +1317,8 @@
if (debugEnabled())
{
- TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
- + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
+ TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg
+ + "\nAND RECEIVED:\n" + topologyMsg);
}
// Alright set the timeout to the desired value
@@ -1342,13 +1339,203 @@
}
/**
+ * Class holding evaluation results for electing the best replication server
+ * for the local directory server.
+ */
+ static class RSEvaluations
+ {
+
+ private final int localServerId;
+ private Map<Integer, ReplicationServerInfo> bestRSs;
+ private final Map<Integer, Message> rsEvals =
+ new HashMap<Integer, Message>();
+
+ /**
+ * Ctor.
+ *
+ * @param localServerId
+ * the serverId for the local directory server
+ * @param rsInfos
+ * a Map of serverId => {@link ReplicationServerInfo} with all the
+ * candidate replication servers
+ */
+ RSEvaluations(int localServerId,
+ Map<Integer, ReplicationServerInfo> rsInfos)
+ {
+ this.localServerId = localServerId;
+ this.bestRSs = rsInfos;
+ }
+
+ private boolean keepBest(LocalEvaluation eval)
+ {
+ if (eval.hasAcceptedAny())
+ {
+ bestRSs = eval.getAccepted();
+ rsEvals.putAll(eval.getRejected());
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Sets the elected best replication server, rejecting all the other
+ * replication servers with the supplied evaluation.
+ *
+ * @param bestRsId
+ * the serverId of the elected replication server
+ * @param rejectedRSsEval
+ * the evaluation for all the rejected replication servers
+ */
+ private void setBestRS(int bestRsId, Message rejectedRSsEval)
+ {
+ for (Iterator<Entry<Integer, ReplicationServerInfo>> it =
+ this.bestRSs.entrySet().iterator(); it.hasNext();)
+ {
+ final Entry<Integer, ReplicationServerInfo> entry = it.next();
+ final Integer rsId = entry.getKey();
+ final ReplicationServerInfo rsInfo = entry.getValue();
+ if (rsInfo.getServerId() != bestRsId)
+ {
+ it.remove();
+ }
+ rsEvals.put(rsId, rejectedRSsEval);
+ }
+ }
+
+ private void discardAll(Message eval)
+ {
+ for (Integer rsId : bestRSs.keySet())
+ {
+ rsEvals.put(rsId, eval);
+ }
+ }
+
+ private boolean foundBestRS()
+ {
+ return bestRSs.size() == 1;
+ }
+
+ /**
+ * Returns the {@link ReplicationServerInfo} for the best replication
+ * server.
+ *
+ * @return the {@link ReplicationServerInfo} for the best replication server
+ */
+ ReplicationServerInfo getBestRS()
+ {
+ if (foundBestRS())
+ {
+ return bestRSs.values().iterator().next();
+ }
+ return null;
+ }
+
+ /**
+ * Returns the evaluations for all the candidate replication servers.
+ *
+ * @return a Map of serverId => Message containing the evaluation for each
+ * candidate replication servers.
+ */
+ Map<Integer, Message> getEvaluations()
+ {
+ if (foundBestRS())
+ {
+ final Integer bestRSServerId = getBestRS().getServerId();
+ if (rsEvals.get(bestRSServerId) == null)
+ {
+ final Message eval = NOTE_BEST_RS.get(bestRSServerId, localServerId);
+ rsEvals.put(bestRSServerId, eval);
+ }
+ }
+ return Collections.unmodifiableMap(rsEvals);
+ }
+
+ /**
+ * Returns the evaluation for the supplied replication server Id.
+ * <p>
+ * Note: "unknown RS" message is returned if the supplied replication server
+ * was not part of the candidate replication servers.
+ *
+ * @param rsServerId
+ * the supplied replication server Id
+ * @return the evaluation {@link Message} for the supplied replication
+ * server Id
+ */
+ private Message getEvaluation(int rsServerId)
+ {
+ final Message evaluation = getEvaluations().get(rsServerId);
+ if (evaluation != null)
+ {
+ return evaluation;
+ }
+ return NOTE_UNKNOWN_RS.get(rsServerId, localServerId);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return new StringBuilder()
+ .append("Current best replication server Ids: ").append(
+ this.bestRSs.keySet()).append(
+ ", Evaluation of connected replication servers").append(
+ " (ServerId => Evaluation): ").append(this.rsEvals.keySet())
+ .append(", Any replication server not appearing here").append(
+ " could not be contacted.").toString();
+ }
+ }
+
+ /**
+ * Evaluation local to one filter.
+ */
+ private static class LocalEvaluation
+ {
+ private final Map<Integer, ReplicationServerInfo> filteredRSs =
+ new HashMap<Integer, ReplicationServerInfo>();
+ private final Map<ReplicationServerInfo, Message> rsEvals =
+ new HashMap<ReplicationServerInfo, Message>();
+
+ private void accept(Integer rsId, ReplicationServerInfo rsInfo)
+ {
+ this.rsEvals.remove(rsInfo); // undo reject
+ this.filteredRSs.put(rsId, rsInfo);
+ }
+
+ private void reject(ReplicationServerInfo rsInfo, Message reason)
+ {
+ this.filteredRSs.remove(rsInfo.getServerId()); // undo accept
+ this.rsEvals.put(rsInfo, reason);
+ }
+
+ private Map<Integer, ReplicationServerInfo> getAccepted()
+ {
+ return filteredRSs;
+ }
+
+ public Map<Integer, Message> getRejected()
+ {
+ final Map<Integer, Message> result = new HashMap<Integer, Message>();
+ for (Entry<ReplicationServerInfo, Message> entry : rsEvals.entrySet())
+ {
+ result.put(entry.getKey().getServerId(), entry.getValue());
+ }
+ return result;
+ }
+
+ private boolean hasAcceptedAny()
+ {
+ return !filteredRSs.isEmpty();
+ }
+
+ }
+
+ /**
* Returns the replication server that best fits our need so that we can
* connect to it or determine if we must disconnect from current one to
* re-connect to best server.
- *
+ * <p>
* Note: this method is static for test purpose (access from unit tests)
*
- *
* @param firstConnection True if we run this method for the very first
* connection of the broker. False if we run this method to determine if the
* replication server we are currently connected to is still the best or not.
@@ -1365,15 +1552,16 @@
* disconnect (so the best replication server is another one than the current
* one). Null can only be returned when firstConnection is false.
*/
- public static ReplicationServerInfo computeBestReplicationServer(
+ public static RSEvaluations computeBestReplicationServer(
boolean firstConnection, int rsServerId, ServerState myState,
Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
byte groupId, long generationId)
{
+ final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos);
// Shortcut, if only one server, this is the best
- if (rsInfos.size() == 1)
+ if (evals.foundBestRS())
{
- return rsInfos.values().iterator().next();
+ return evals;
}
/**
@@ -1386,8 +1574,6 @@
* local DS
* - replication server in the same VM as local DS one
*/
- // TODO JNR log why an RS was evicted as best server
- Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
/*
The list of best replication servers is filtered with each criteria. At
each criteria, the list is replaced with the filtered one if there
@@ -1398,109 +1584,96 @@
the local configuration. When the current method is called, for
sure, at least one server from the list is locally configured
*/
- bestServers =
- keepBest(filterServersLocallyConfigured(bestServers), bestServers);
+ filterServersLocallyConfigured(evals, localServerId);
// Some servers with same group id ?
- bestServers =
- keepBest(filterServersWithSameGroupId(bestServers, groupId),
- bestServers);
+ filterServersWithSameGroupId(evals, localServerId, groupId);
// Some servers with same generation id ?
- Map<Integer, ReplicationServerInfo> sameGenerationId =
- filterServersWithSameGenerationId(bestServers, generationId);
- if (sameGenerationId.size() > 0)
+ final boolean rssWithSameGenerationIdExist =
+ filterServersWithSameGenerationId(evals, localServerId, generationId);
+ if (rssWithSameGenerationIdExist)
{
// If some servers with the right generation id this is useful to
// run the local DS change criteria
- bestServers =
- keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
- myState, localServerId), sameGenerationId);
+ filterServersWithAllLocalDSChanges(evals, myState, localServerId);
}
// Some servers in the local VM or local host?
- bestServers = keepBest(filterServersOnSameHost(bestServers), bestServers);
+ filterServersOnSameHost(evals, localServerId);
- /**
- * Now apply the choice base on the weight to the best servers list
- */
- if (bestServers.size() == 1)
+ if (evals.foundBestRS())
{
- return bestServers.values().iterator().next();
+ return evals;
}
+ /**
+ * Now apply the choice based on the weight to the best servers list
+ */
if (firstConnection)
{
// We are not connected to a server yet
- return computeBestServerForWeight(bestServers, -1, -1);
+ computeBestServerForWeight(evals, -1, -1);
}
- /*
- * We are already connected to a RS: compute the best RS as far as the
- * weights is concerned. If this is another one, some DS must disconnect.
- */
- return computeBestServerForWeight(bestServers, rsServerId, localServerId);
- }
-
- /**
- * If the filtered Map is not empty then it is returned, else return the
- * original unfiltered Map.
- *
- * @return the best fit Map between the filtered Map and the original
- * unfiltered Map.
- */
- private static <K, V> Map<K, V> keepBest(Map<K, V> filteredMap,
- Map<K, V> unfilteredMap)
- {
- if (!filteredMap.isEmpty())
+ else
{
- return filteredMap;
+ /*
+ * We are already connected to a RS: compute the best RS as far as the
+ * weights is concerned. If this is another one, some DS must disconnect.
+ */
+ computeBestServerForWeight(evals, rsServerId, localServerId);
}
- return unfilteredMap;
+ return evals;
}
/**
* Creates a new list that contains only replication servers that are locally
* configured.
- * @param bestServers The list of replication servers to filter
- * @return The sub list of replication servers locally configured
+ * @param evals The evaluation object
*/
- private static Map<Integer, ReplicationServerInfo>
- filterServersLocallyConfigured(Map<Integer,
- ReplicationServerInfo> bestServers)
+ private static void filterServersLocallyConfigured(RSEvaluations evals,
+ int localServerId)
{
- Map<Integer, ReplicationServerInfo> result =
- new HashMap<Integer, ReplicationServerInfo>();
- for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+ final LocalEvaluation eval = new LocalEvaluation();
+ for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
{
- ReplicationServerInfo rsInfo = entry.getValue();
+ final Integer rsId = entry.getKey();
+ final ReplicationServerInfo rsInfo = entry.getValue();
if (rsInfo.isLocallyConfigured())
{
- result.put(entry.getKey(), rsInfo);
+ eval.accept(rsId, rsInfo);
+ }
+ else
+ {
+ eval.reject(rsInfo,
+ NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId));
}
}
- return result;
+ evals.keepBest(eval);
}
/**
* Creates a new list that contains only replication servers that have the
* passed group id, from a passed replication server list.
- * @param bestServers The list of replication servers to filter
+ * @param evals The evaluation object
* @param groupId The group id that must match
- * @return The sub list of replication servers matching the requested group id
- * (which may be empty)
*/
- private static Map<Integer, ReplicationServerInfo>
- filterServersWithSameGroupId(Map<Integer,
- ReplicationServerInfo> bestServers, byte groupId)
+ private static void filterServersWithSameGroupId(RSEvaluations evals,
+ int localServerId, byte groupId)
{
- Map<Integer, ReplicationServerInfo> result =
- new HashMap<Integer, ReplicationServerInfo>();
- for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+ final LocalEvaluation eval = new LocalEvaluation();
+ for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
{
- ReplicationServerInfo rsInfo = entry.getValue();
+ final Integer rsId = entry.getKey();
+ final ReplicationServerInfo rsInfo = entry.getValue();
if (rsInfo.getGroupId() == groupId)
{
- result.put(entry.getKey(), rsInfo);
+ eval.accept(rsId, rsInfo);
+ }
+ else
+ {
+ eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get(
+ rsId, rsInfo.getGroupId(), localServerId, groupId));
}
}
- return result;
+ evals.keepBest(eval);
}
/**
@@ -1510,28 +1683,37 @@
* then the 'empty'(generationId==-1) replication servers are also included
* in the result list.
*
- * @param bestServers The list of replication servers to filter
+ * @param evals The evaluation object
* @param generationId The generation id that must match
- * @return The sub list of replication servers matching the requested
- * generation id (which may be empty)
+ * @return whether some replication server passed the filter
*/
- private static Map<Integer, ReplicationServerInfo>
- filterServersWithSameGenerationId(Map<Integer,
- ReplicationServerInfo> bestServers, long generationId)
+ private static boolean filterServersWithSameGenerationId(
+ RSEvaluations evals, long localServerId, long generationId)
{
- Map<Integer, ReplicationServerInfo> result =
- new HashMap<Integer, ReplicationServerInfo>();
+ final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
+ final LocalEvaluation eval = new LocalEvaluation();
boolean emptyState = true;
for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
{
- ReplicationServerInfo rsInfo = entry.getValue();
+ final Integer rsId = entry.getKey();
+ final ReplicationServerInfo rsInfo = entry.getValue();
if (rsInfo.getGenerationId() == generationId)
{
- result.put(entry.getKey(), rsInfo);
+ eval.accept(rsId, rsInfo);
if (!rsInfo.serverState.isEmpty())
emptyState = false;
}
+ else if (rsInfo.getGenerationId() == -1)
+ {
+ eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId,
+ generationId, localServerId));
+ }
+ else
+ {
+ eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get(
+ rsId, rsInfo.getGenerationId(), localServerId, generationId));
+ }
}
if (emptyState)
@@ -1543,38 +1725,27 @@
ReplicationServerInfo rsInfo = entry.getValue();
if (rsInfo.getGenerationId() == -1)
{
- result.put(entry.getKey(), rsInfo);
+ // will undo the reject of previously rejected RSs
+ eval.accept(entry.getKey(), rsInfo);
}
}
}
- return result;
+
+ return evals.keepBest(eval);
}
/**
* Creates a new list that contains only replication servers that have the
* latest changes from the passed DS, from a passed replication server list.
- * @param bestServers The list of replication servers to filter
+ * @param evals The evaluation object
* @param localState The state of the local DS
* @param localServerId The server id to consider for the changes
- * @return The sub list of replication servers that have the latest changes
- * from the passed DS (which may be empty)
*/
- private static Map<Integer, ReplicationServerInfo>
- filterServersWithAllLocalDSChanges(Map<Integer,
- ReplicationServerInfo> bestServers, ServerState localState,
- int localServerId)
+ private static void filterServersWithAllLocalDSChanges(
+ RSEvaluations evals, ServerState localState, int localServerId)
{
- Map<Integer, ReplicationServerInfo> upToDateServers =
- new HashMap<Integer, ReplicationServerInfo>();
- Map<Integer, ReplicationServerInfo> moreUpToDateServers =
- new HashMap<Integer, ReplicationServerInfo>();
-
// Extract the CSN of the latest change generated by the local server
- CSN myCSN = localState.getCSN(localServerId);
- if (myCSN == null)
- {
- myCSN = new CSN(0, 0, localServerId);
- }
+ final CSN localCSN = getCSN(localState, localServerId);
/**
* Find replication servers that are up to date (or more up to date than us,
@@ -1583,60 +1754,96 @@
* server id. If some servers are more up to date, prefer this list but take
* only the latest CSN.
*/
+ final LocalEvaluation mostUpToDateEval = new LocalEvaluation();
+ boolean foundRSMoreUpToDateThanLocalDS = false;
CSN latestRsCSN = null;
- for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+ for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
{
final Integer rsId = entry.getKey();
final ReplicationServerInfo rsInfo = entry.getValue();
- CSN rsCSN = rsInfo.getServerState().getCSN(localServerId);
- if (rsCSN == null)
- {
- rsCSN = new CSN(0, 0, localServerId);
- }
+ final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId);
// Has this replication server the latest local change ?
- if (myCSN.isOlderThanOrEqualTo(rsCSN))
+ if (rsCSN.isOlderThan(localCSN))
{
- if (myCSN.equals(rsCSN))
+ mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get(
+ rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
+ }
+ else if (rsCSN.equals(localCSN))
+ {
+ // This replication server has exactly the latest change from the
+ // local server
+ if (!foundRSMoreUpToDateThanLocalDS)
{
- // This replication server has exactly the latest change from the
- // local server
- upToDateServers.put(rsId, rsInfo);
- } else
+ mostUpToDateEval.accept(rsId, rsInfo);
+ }
+ else
{
- // This replication server is even more up to date than the local
- // server
- if (latestRsCSN == null)
- {
- // Initialize the latest CSN
- latestRsCSN = rsCSN;
- }
- if (rsCSN.isNewerThanOrEqualTo(latestRsCSN))
- {
- if (rsCSN.equals(latestRsCSN))
- {
- moreUpToDateServers.put(rsId, rsInfo);
- } else
- {
- // This RS is even more up to date, clear the list and store this
- // new RS
- moreUpToDateServers.clear();
- moreUpToDateServers.put(rsId, rsInfo);
- latestRsCSN = rsCSN;
- }
- }
+ mostUpToDateEval.reject(rsInfo,
+ NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
+ rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
+ }
+ }
+ else if (rsCSN.isNewerThan(localCSN))
+ {
+ // This replication server is even more up to date than the local server
+ if (latestRsCSN == null)
+ {
+ foundRSMoreUpToDateThanLocalDS = true;
+ // all previous results are now outdated, reject them all
+ rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
+ localCSN);
+ // Initialize the latest CSN
+ latestRsCSN = rsCSN;
+ }
+
+ if (rsCSN.equals(latestRsCSN))
+ {
+ mostUpToDateEval.accept(rsId, rsInfo);
+ }
+ else if (rsCSN.isNewerThan(latestRsCSN))
+ {
+ // This RS is even more up to date, reject all previously accepted RSs
+ // and store this new RS
+ rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
+ localCSN);
+ mostUpToDateEval.accept(rsId, rsInfo);
+ latestRsCSN = rsCSN;
+ }
+ else
+ {
+ mostUpToDateEval.reject(rsInfo,
+ NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
+ rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
}
}
}
- if (moreUpToDateServers.size() > 0)
- {
- // Prefer servers more up to date than local server
- return moreUpToDateServers;
- }
- return upToDateServers;
+ evals.keepBest(mostUpToDateEval);
}
+ private static CSN getCSN(ServerState localState, int localServerId)
+ {
+ final CSN csn = localState.getCSN(localServerId);
+ if (csn != null)
+ {
+ return csn;
+ }
+ return new CSN(0, 0, localServerId);
+ }
+ private static void rejectAllWithRSIsLaterThanBestRS(
+ final LocalEvaluation eval, int localServerId, CSN localCSN)
+ {
+ for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
+ {
+ final String rsCSN =
+ getCSN(rsInfo.getServerState(), localServerId).toStringUI();
+ final Message reason =
+ NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
+ rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI());
+ eval.reject(rsInfo, reason);
+ }
+ }
/**
* Creates a new list that contains only replication servers that are on the
@@ -1644,22 +1851,18 @@
* method will gives priority to any replication server which is in the same
* VM as this DS.
*
- * @param bestServers
- * The list of replication servers to filter
- * @return The sub list of replication servers being on the same host as the
- * local DS (which may be empty)
+ * @param evals The evaluation object
*/
- private static Map<Integer, ReplicationServerInfo> filterServersOnSameHost(
- Map<Integer, ReplicationServerInfo> bestServers)
+ private static void filterServersOnSameHost(RSEvaluations evals,
+ int localServerId)
{
/*
* Initially look for all servers on the same host. If we find one in the
* same VM, then narrow the search.
*/
boolean foundRSInSameVM = false;
- final Map<Integer, ReplicationServerInfo> result =
- new HashMap<Integer, ReplicationServerInfo>();
- for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+ final LocalEvaluation eval = new LocalEvaluation();
+ for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
{
final Integer rsId = entry.getKey();
final ReplicationServerInfo rsInfo = entry.getValue();
@@ -1672,24 +1875,41 @@
{
// An RS in the same VM will always have priority.
// Narrow the search to only include servers in this VM.
- result.clear();
+ rejectAllWithRSOnDifferentVMThanDS(eval, localServerId);
foundRSInSameVM = true;
}
- result.put(rsId, rsInfo);
+ eval.accept(rsId, rsInfo);
}
else if (!foundRSInSameVM)
{
// OK, accept RSs on the same machine because we have not found an RS
// in the same VM yet
- result.put(rsId, rsInfo);
+ eval.accept(rsId, rsInfo);
}
else
{
// Skip: we have found some RSs in the same VM, but this RS is not.
+ eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId,
+ localServerId));
}
}
+ else
+ {
+ eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId,
+ localServerId));
+ }
}
- return result;
+ evals.keepBest(eval);
+ }
+
+ private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval,
+ int localServerId)
+ {
+ for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
+ {
+ eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(
+ rsInfo.getServerId(), localServerId));
+ }
}
/**
@@ -1699,23 +1919,18 @@
* Warning: This method is expected to be called with at least 2 servers in
* bestServers
* Note: this method is static for test purpose (access from unit tests)
- * @param bestServers The list of replication servers to consider
+ * @param evals The evaluation object
* @param currentRsServerId The replication server the local server is
* currently connected to. -1 if the local server is not yet connected
* to any replication server.
* @param localServerId The server id of the local server. This is not used
* when it is not connected to a replication server
* (currentRsServerId = -1)
- * @return The replication server the local server should be connected to
- * as far as the weight is concerned. This may be the currently used one if
- * the weight is correctly spread. If the returned value is null, the best
- * replication server is undetermined but the local server must disconnect
- * (so the best replication server is another one than the current one).
*/
- public static ReplicationServerInfo computeBestServerForWeight(
- Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId,
- int localServerId)
+ public static void computeBestServerForWeight(RSEvaluations evals,
+ int currentRsServerId, int localServerId)
{
+ final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
/*
* - Compute the load goal of each RS, deducing it from the weights affected
* to them.
@@ -1765,18 +1980,21 @@
{
// The local server is not connected yet, find best server to connect to,
// taking the weights into account.
- return computeBestServerWhenNotConnected(bestServers, loadDistances);
+ computeBestServerWhenNotConnected(evals, loadDistances, localServerId);
}
- // The local server is currently connected to a RS, let's see if it must
- // disconnect or not, taking the weights into account.
- return computeBestServerWhenConnected(bestServers, loadDistances,
- localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs);
+ else
+ {
+ // The local server is currently connected to a RS, let's see if it must
+ // disconnect or not, taking the weights into account.
+ computeBestServerWhenConnected(evals, loadDistances, localServerId,
+ currentRsServerId, sumOfWeights, sumOfConnectedDSs);
+ }
}
- private static ReplicationServerInfo computeBestServerWhenNotConnected(
- Map<Integer, ReplicationServerInfo> bestServers,
- Map<Integer, BigDecimal> loadDistances)
+ private static void computeBestServerWhenNotConnected(RSEvaluations evals,
+ Map<Integer, BigDecimal> loadDistances, int localServerId)
{
+ final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
/*
* Find the server with the current highest distance to its load goal and
* choose it. Make an exception if every server is correctly balanced,
@@ -1815,14 +2033,15 @@
// Choose server with the highest weight
bestRsId = highestWeightRsId;
}
- return bestServers.get(bestRsId);
+ evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId,
+ bestRsId));
}
- private static ReplicationServerInfo computeBestServerWhenConnected(
- Map<Integer, ReplicationServerInfo> bestServers,
+ private static void computeBestServerWhenConnected(RSEvaluations evals,
Map<Integer, BigDecimal> loadDistances, int localServerId,
int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
{
+ final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
float currentLoadDistance =
loadDistances.get(currentRsServerId).floatValue();
@@ -1928,45 +2147,67 @@
{
// Avoid the yoyo effect, and keep the local DS connected to its
// current RS
- return bestServers.get(currentRsServerId);
+ evals.setBestRS(currentRsServerId,
+ NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId));
+ return;
}
}
- // Prepare a sorted list (from lowest to highest) or DS server ids
- // connected to the current RS
ReplicationServerInfo currentRsInfo =
- bestServers.get(currentRsServerId);
- List<Integer> serversConnectedToCurrentRS =
- new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
- Collections.sort(serversConnectedToCurrentRS);
-
- // Go through the list of DSs to disconnect and see if the local
- // server is part of them.
- int index = 0;
- while (overloadingDSsNumber > 0)
+ bestServers.get(currentRsServerId);
+ if (isServerOverloadingRS(localServerId, currentRsInfo,
+ overloadingDSsNumber))
{
- int serverIdToDisconnect = serversConnectedToCurrentRS.get(index);
- if (serverIdToDisconnect == localServerId)
- {
- // The local server is part of the DSs to disconnect
- return null;
- }
- overloadingDSsNumber--;
- index++;
+ // The local server is part of the DSs to disconnect
+ evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get(
+ localServerId, currentRsServerId));
}
-
- // The local server is not part of the servers to disconnect from the
- // current RS.
+ else
+ {
+ // The local server is not part of the servers to disconnect from the
+ // current RS.
+ evals.setBestRS(currentRsServerId,
+ NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId,
+ currentRsServerId));
+ }
} else {
// The average distance of the other RSs does not show a lack of DSs:
// no need to disconnect any DS from the current RS.
+ evals.setBestRS(currentRsServerId,
+ NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId,
+ currentRsServerId));
}
} else {
// The RS load goal is reached or there are not enough DSs connected to
// it to reach it: do not disconnect from this RS and return rsInfo for
// this RS
+ evals.setBestRS(currentRsServerId,
+ NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId,
+ currentRsServerId));
}
- return bestServers.get(currentRsServerId);
+ }
+
+ /**
+ * Returns whether the local DS is overloading the RS.
+ * <p>
+ * There are an "overloadingDSsNumber" of DS overloading the RS. The list of
+ * DSs connected to this RS is ordered by serverId to use a consistent
+ * ordering across all nodes in the topology. The serverIds which index in the
+ * List are lower than "overloadingDSsNumber" will be evicted first.
+ * <p>
+ * This ordering is unfair since nodes with the lower serverIds will be
+ * evicted more often than nodes with higher serverIds. However, it is a
+ * consistent and reliable ordering applicable anywhere in the topology.
+ */
+ private static boolean isServerOverloadingRS(int localServerId,
+ ReplicationServerInfo currentRsInfo, int overloadingDSsNumber)
+ {
+ List<Integer> serversConnectedToCurrentRS =
+ new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
+ Collections.sort(serversConnectedToCurrentRS);
+
+ final int idx = serversConnectedToCurrentRS.indexOf(localServerId);
+ return idx != -1 && idx < overloadingDSsNumber;
}
/**
@@ -2069,8 +2310,8 @@
if (debugEnabled())
{
- TRACER.debugInfo(this + " end restart : connected=" + connected
- + " with RSid=" + getRsServerId() + " genid=" + this.generationID);
+ debugInfo("end restart : connected=" + connected + " with RS("
+ + getRsServerId() + ") genId=" + this.generationID);
}
}
@@ -2129,8 +2370,8 @@
if (debugEnabled())
{
- TRACER.debugInfo("ReplicationBroker.publish() Publishing a "
- + "message is not possible due to existing connection error.");
+ debugInfo("publish(): Publishing a message is not possible due to"
+ + " existing connection error.");
}
return false;
@@ -2232,8 +2473,8 @@
// ignore
if (debugEnabled())
{
- TRACER.debugInfo("ReplicationBroker.publish() "
- + "Interrupted exception raised : " + e.getLocalizedMessage());
+ debugInfo("publish(): Interrupted exception raised : "
+ + e.getLocalizedMessage());
}
}
}
@@ -2242,8 +2483,8 @@
// just loop.
if (debugEnabled())
{
- TRACER.debugInfo("ReplicationBroker.publish() "
- + "Interrupted exception raised." + e.getLocalizedMessage());
+ debugInfo("publish(): Interrupted exception raised."
+ + e.getLocalizedMessage());
}
}
}
@@ -2393,10 +2634,10 @@
{
// Stable topology (no topo msg since few seconds): proceed with
// best server checking.
- final ReplicationServerInfo bestServerInfo =
+ final RSEvaluations evals =
computeBestReplicationServer(false, previousRsServerID, state,
- replicationServerInfos, serverId, getGroupId(),
- generationID);
+ replicationServerInfos, serverId, getGroupId(), generationID);
+ final ReplicationServerInfo bestServerInfo = evals.getBestRS();
if (previousRsServerID != -1
&& (bestServerInfo == null
|| bestServerInfo.getServerId() != previousRsServerID))
@@ -2413,14 +2654,19 @@
}
else
{
- // TODO JNR log why an RS was evicted as best server
+ final int bestRsServerId = bestServerInfo.getServerId();
message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
serverId, previousRsServerID,
localSession.getReadableRemoteAddress(),
- bestServerInfo.getServerId(),
- baseDN.toNormalizedString());
+ bestRsServerId,
+ baseDN.toNormalizedString(),
+ evals.getEvaluation(previousRsServerID).toString(),
+ evals.getEvaluation(bestRsServerId).toString());
}
logError(message);
+ if (debugEnabled())
+ debugInfo("best replication servers evaluation results: "
+ + evals);
reStart(true);
}
@@ -2472,14 +2718,12 @@
}
/**
- * Gets the States of all the Replicas currently in the
- * Topology.
- * When this method is called, a Monitoring message will be sent
- * to the Replication Server to which this domain is currently connected
- * so that it computes a table containing information about
- * all Directory Servers in the topology.
- * This Computation involves communications will all the servers
- * currently connected and
+ * Gets the States of all the Replicas currently in the Topology. When this
+ * method is called, a Monitoring message will be sent to the Replication
+ * Server to which this domain is currently connected so that it computes a
+ * table containing information about all Directory Servers in the topology.
+ * This Computation involves communications will all the servers currently
+ * connected and
*
* @return The States of all Replicas in the topology (except us)
*/
@@ -2539,9 +2783,8 @@
public void stop()
{
if (debugEnabled())
- TRACER.debugInfo("ReplicationBroker " + getServerId() + " is stopping"
- + " and will close the connection to replication server "
- + rsServerId + " for domain " + getBaseDN());
+ debugInfo("is stopping and will close the connection to"
+ + " replication server " + rsServerId);
synchronized (startStopLock)
{
@@ -2786,7 +3029,7 @@
public void receiveTopo(TopologyMsg topoMsg)
{
if (debugEnabled())
- TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg);
+ debugInfo("receive TopologyMsg=" + topoMsg);
// Store new DS list
dsList = topoMsg.getDsList();
@@ -2876,8 +3119,7 @@
else
{
if (debugEnabled())
- TRACER.debugInfo(this
- + " is not configured to send CSN heartbeat interval");
+ debugInfo("is not configured to send CSN heartbeat interval");
}
}
@@ -3003,4 +3245,10 @@
}
return sb.toString();
}
+
+ private void debugInfo(String message)
+ {
+ TRACER.debugInfo(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
+ + " and serverId=" + getServerId() + " " + message);
+ }
}
--
Gitblit v1.10.0