From 5c3362e4dd1ee79c10160cfd128b70cdb9a2dee1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 28 Oct 2013 09:06:07 +0000
Subject: [PATCH] OPENDJ-1053 (CR-2520) Improve logging of replication load balancing and fail-over in order to diagnose the causes of these events
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ComputeBestServerTest.java | 263 +++++++++++----
opends/src/messages/messages/replication.properties | 35 +
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 722 ++++++++++++++++++++++++++++-------------
3 files changed, 708 insertions(+), 312 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index dda3331..3ce2255 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -406,7 +406,8 @@
server will now try to connect to another replication server
NOTICE_NEW_BEST_REPLICATION_SERVER_188=Directory Server DS(%d) is switching \
from replication server RS(%d) at %s to RS(%d) for domain "%s" because it is \
- more suitable
+ more suitable. The previous replication server evaluation was: "%s", and the \
+ new replication server evaluation was: "%s"
NOTICE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START_141=Starting total update: \
importing domain "%s" from remote directory server DS(%d) to this directory \
server DS(%d)
@@ -495,4 +496,34 @@
after having successfully read the oldest. Database might have been cleaned or \
closed between successive reads
SEVERE_WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED_218=Cannot \
- open database %s because shutdown was requested from replication server RS(%d)
\ No newline at end of file
+ open database %s because shutdown was requested from replication server RS(%d)
+NOTICE_RS_NOT_LOCALLY_CONFIGURED_219=RS(%d) was not configured locally on DS(%d), \
+ but at least one other RS was
+NOTICE_RS_HAS_NO_GENERATION_ID_220=RS(%d) has no generation Id, but at least one \
+ other RS has the same generation Id %d as DS(%d)
+NOTICE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS_221=RS(%d) generation Id %d does not \
+ match DS(%d) generation Id %d, but at least another RS does
+NOTICE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS_222=RS(%d) groupId %d does not match \
+ DS(%d) groupId %d, but at least another RS does
+NOTICE_RS_LATER_THAN_LOCAL_DS_223=RS(%d) newest change %s is behind DS(%d) \
+ newest change %s, but at least another RS is at the same point or ahead of the DS
+NOTICE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS_224=RS(%d) newest \
+ change %s is behind another RS which is ahead of DS(%d) newest change %s
+NOTICE_RS_ON_DIFFERENT_VM_THAN_DS_225=RS(%d) is on the same host, but a different \
+ virtual machine than DS(%d), but at least another RS is
+NOTICE_RS_ON_DIFFERENT_HOST_THAN_DS_226=RS(%d) is on a different host than DS(%d), \
+ but at least another RS is on the same host
+NOTICE_DISCONNECT_DS_FROM_OVERLOADED_RS_227=DS(%d) disconnected from overloaded RS(%d)
+NOTICE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS_228=DS(%d) not disconnected from \
+ overloaded RS(%d), other DSs will disconnect
+NOTICE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS_229=DS(%d) not disconnected from \
+ current RS(%d), since there is no need to rebalance all directory servers to \
+ other replication servers in the topology.
+NOTICE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS_230=DS(%d) not disconnected \
+ from current RS(%d), because RS is underloaded or its load goal is reached
+NOTICE_BIGGEST_WEIGHT_RS_231=DS(%d) will connect to RS(%d) because it has the \
+ biggest weight among all the replication servers
+NOTICE_AVOID_YOYO_EFFECT_232=DS(%d) stayed connected to RS(%d) to avoid the yoyo effect
+NOTICE_BEST_RS_233=RS(%d) has been evaluated to be the best replication server \
+ for DS(%d) to connect to because it was the only one standing after all tests
+NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 1072f55..e755741 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -806,9 +806,8 @@
* out which one is the best to connect to.
*/
if (debugEnabled())
- TRACER.debugInfo("serverId: " + serverId
- + " phase 1 : will perform PhaseOneH with each RS in "
- + " order to elect the preferred one");
+ debugInfo("phase 1 : will perform PhaseOneH with each RS in order to "
+ + "elect the preferred one");
// Get info from every available replication servers
replicationServerInfos = collectReplicationServersInfo();
@@ -818,14 +817,14 @@
if (replicationServerInfos.size() > 0)
{
// At least one server answered, find the best one.
- electedRsInfo = computeBestReplicationServer(true, -1, state,
- replicationServerInfos, serverId, getGroupId(), getGenerationID());
+ RSEvaluations evals = computeBestReplicationServer(true, -1, state,
+ replicationServerInfos, serverId, getGroupId(), getGenerationID());
+ electedRsInfo = evals.getBestRS();
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
- TRACER.debugInfo("serverId: " + serverId
- + " phase 2 : will perform PhaseOneH with the preferred RS="
- + electedRsInfo);
+ debugInfo("phase 2 : will perform PhaseOneH with the preferred RS="
+ + electedRsInfo);
electedRsInfo = performPhaseOneHandshake(
electedRsInfo.getServerURL(), true, false);
@@ -1056,8 +1055,7 @@
int nChanges = ServerState.diffChanges(rsState, state);
if (debugEnabled())
{
- TRACER.debugInfo("RB for dn " + getBaseDN() + " and with server id "
- + getServerId() + " computed " + nChanges + " changes late.");
+ debugInfo("computed " + nChanges + " changes late.");
}
/*
@@ -1136,8 +1134,8 @@
ReplicationMsg msg = localSession.receive();
if (debugEnabled())
{
- TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
- + serverStartMsg + "\nAND RECEIVED:\n" + msg);
+ debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
+ + msg);
}
// Wrap received message in a server info object
@@ -1253,8 +1251,7 @@
// FIXME ECL In the handshake phase two, should RS send back a topo msg ?
if (debugEnabled())
{
- TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
- + startECLSessionMsg);
+ TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
}
// Alright set the timeout to the desired value
@@ -1320,8 +1317,8 @@
if (debugEnabled())
{
- TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n"
- + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
+ TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg
+ + "\nAND RECEIVED:\n" + topologyMsg);
}
// Alright set the timeout to the desired value
@@ -1342,13 +1339,203 @@
}
/**
+ * Class holding evaluation results for electing the best replication server
+ * for the local directory server.
+ */
+ static class RSEvaluations
+ {
+
+ private final int localServerId;
+ private Map<Integer, ReplicationServerInfo> bestRSs;
+ private final Map<Integer, Message> rsEvals =
+ new HashMap<Integer, Message>();
+
+ /**
+ * Ctor.
+ *
+ * @param localServerId
+ * the serverId for the local directory server
+ * @param rsInfos
+ * a Map of serverId => {@link ReplicationServerInfo} with all the
+ * candidate replication servers
+ */
+ RSEvaluations(int localServerId,
+ Map<Integer, ReplicationServerInfo> rsInfos)
+ {
+ this.localServerId = localServerId;
+ this.bestRSs = rsInfos;
+ }
+
+ private boolean keepBest(LocalEvaluation eval)
+ {
+ if (eval.hasAcceptedAny())
+ {
+ bestRSs = eval.getAccepted();
+ rsEvals.putAll(eval.getRejected());
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Sets the elected best replication server, rejecting all the other
+ * replication servers with the supplied evaluation.
+ *
+ * @param bestRsId
+ * the serverId of the elected replication server
+ * @param rejectedRSsEval
+ * the evaluation for all the rejected replication servers
+ */
+ private void setBestRS(int bestRsId, Message rejectedRSsEval)
+ {
+ for (Iterator<Entry<Integer, ReplicationServerInfo>> it =
+ this.bestRSs.entrySet().iterator(); it.hasNext();)
+ {
+ final Entry<Integer, ReplicationServerInfo> entry = it.next();
+ final Integer rsId = entry.getKey();
+ final ReplicationServerInfo rsInfo = entry.getValue();
+ if (rsInfo.getServerId() != bestRsId)
+ {
+ it.remove();
+ }
+ rsEvals.put(rsId, rejectedRSsEval);
+ }
+ }
+
+ private void discardAll(Message eval)
+ {
+ for (Integer rsId : bestRSs.keySet())
+ {
+ rsEvals.put(rsId, eval);
+ }
+ }
+
+ private boolean foundBestRS()
+ {
+ return bestRSs.size() == 1;
+ }
+
+ /**
+ * Returns the {@link ReplicationServerInfo} for the best replication
+ * server.
+ *
+ * @return the {@link ReplicationServerInfo} for the best replication server
+ */
+ ReplicationServerInfo getBestRS()
+ {
+ if (foundBestRS())
+ {
+ return bestRSs.values().iterator().next();
+ }
+ return null;
+ }
+
+ /**
+ * Returns the evaluations for all the candidate replication servers.
+ *
+ * @return a Map of serverId => Message containing the evaluation for each
+ * candidate replication servers.
+ */
+ Map<Integer, Message> getEvaluations()
+ {
+ if (foundBestRS())
+ {
+ final Integer bestRSServerId = getBestRS().getServerId();
+ if (rsEvals.get(bestRSServerId) == null)
+ {
+ final Message eval = NOTE_BEST_RS.get(bestRSServerId, localServerId);
+ rsEvals.put(bestRSServerId, eval);
+ }
+ }
+ return Collections.unmodifiableMap(rsEvals);
+ }
+
+ /**
+ * Returns the evaluation for the supplied replication server Id.
+ * <p>
+ * Note: "unknown RS" message is returned if the supplied replication server
+ * was not part of the candidate replication servers.
+ *
+ * @param rsServerId
+ * the supplied replication server Id
+ * @return the evaluation {@link Message} for the supplied replication
+ * server Id
+ */
+ private Message getEvaluation(int rsServerId)
+ {
+ final Message evaluation = getEvaluations().get(rsServerId);
+ if (evaluation != null)
+ {
+ return evaluation;
+ }
+ return NOTE_UNKNOWN_RS.get(rsServerId, localServerId);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return new StringBuilder()
+ .append("Current best replication server Ids: ").append(
+ this.bestRSs.keySet()).append(
+ ", Evaluation of connected replication servers").append(
+ " (ServerId => Evaluation): ").append(this.rsEvals.keySet())
+ .append(", Any replication server not appearing here").append(
+ " could not be contacted.").toString();
+ }
+ }
+
+ /**
+ * Evaluation local to one filter.
+ */
+ private static class LocalEvaluation
+ {
+ private final Map<Integer, ReplicationServerInfo> filteredRSs =
+ new HashMap<Integer, ReplicationServerInfo>();
+ private final Map<ReplicationServerInfo, Message> rsEvals =
+ new HashMap<ReplicationServerInfo, Message>();
+
+ private void accept(Integer rsId, ReplicationServerInfo rsInfo)
+ {
+ this.rsEvals.remove(rsInfo); // undo reject
+ this.filteredRSs.put(rsId, rsInfo);
+ }
+
+ private void reject(ReplicationServerInfo rsInfo, Message reason)
+ {
+ this.filteredRSs.remove(rsInfo.getServerId()); // undo accept
+ this.rsEvals.put(rsInfo, reason);
+ }
+
+ private Map<Integer, ReplicationServerInfo> getAccepted()
+ {
+ return filteredRSs;
+ }
+
+ public Map<Integer, Message> getRejected()
+ {
+ final Map<Integer, Message> result = new HashMap<Integer, Message>();
+ for (Entry<ReplicationServerInfo, Message> entry : rsEvals.entrySet())
+ {
+ result.put(entry.getKey().getServerId(), entry.getValue());
+ }
+ return result;
+ }
+
+ private boolean hasAcceptedAny()
+ {
+ return !filteredRSs.isEmpty();
+ }
+
+ }
+
+ /**
* Returns the replication server that best fits our need so that we can
* connect to it or determine if we must disconnect from current one to
* re-connect to best server.
- *
+ * <p>
* Note: this method is static for test purpose (access from unit tests)
*
- *
* @param firstConnection True if we run this method for the very first
* connection of the broker. False if we run this method to determine if the
* replication server we are currently connected to is still the best or not.
@@ -1365,15 +1552,16 @@
* disconnect (so the best replication server is another one than the current
* one). Null can only be returned when firstConnection is false.
*/
- public static ReplicationServerInfo computeBestReplicationServer(
+ public static RSEvaluations computeBestReplicationServer(
boolean firstConnection, int rsServerId, ServerState myState,
Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
byte groupId, long generationId)
{
+ final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos);
// Shortcut, if only one server, this is the best
- if (rsInfos.size() == 1)
+ if (evals.foundBestRS())
{
- return rsInfos.values().iterator().next();
+ return evals;
}
/**
@@ -1386,8 +1574,6 @@
* local DS
* - replication server in the same VM as local DS one
*/
- // TODO JNR log why an RS was evicted as best server
- Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
/*
The list of best replication servers is filtered with each criteria. At
each criteria, the list is replaced with the filtered one if there
@@ -1398,109 +1584,96 @@
the local configuration. When the current method is called, for
sure, at least one server from the list is locally configured
*/
- bestServers =
- keepBest(filterServersLocallyConfigured(bestServers), bestServers);
+ filterServersLocallyConfigured(evals, localServerId);
// Some servers with same group id ?
- bestServers =
- keepBest(filterServersWithSameGroupId(bestServers, groupId),
- bestServers);
+ filterServersWithSameGroupId(evals, localServerId, groupId);
// Some servers with same generation id ?
- Map<Integer, ReplicationServerInfo> sameGenerationId =
- filterServersWithSameGenerationId(bestServers, generationId);
- if (sameGenerationId.size() > 0)
+ final boolean rssWithSameGenerationIdExist =
+ filterServersWithSameGenerationId(evals, localServerId, generationId);
+ if (rssWithSameGenerationIdExist)
{
// If some servers with the right generation id this is useful to
// run the local DS change criteria
- bestServers =
- keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
- myState, localServerId), sameGenerationId);
+ filterServersWithAllLocalDSChanges(evals, myState, localServerId);
}
// Some servers in the local VM or local host?
- bestServers = keepBest(filterServersOnSameHost(bestServers), bestServers);
+ filterServersOnSameHost(evals, localServerId);
- /**
- * Now apply the choice base on the weight to the best servers list
- */
- if (bestServers.size() == 1)
+ if (evals.foundBestRS())
{
- return bestServers.values().iterator().next();
+ return evals;
}
+ /**
+ * Now apply the choice based on the weight to the best servers list
+ */
if (firstConnection)
{
// We are not connected to a server yet
- return computeBestServerForWeight(bestServers, -1, -1);
+ computeBestServerForWeight(evals, -1, -1);
}
- /*
- * We are already connected to a RS: compute the best RS as far as the
- * weights is concerned. If this is another one, some DS must disconnect.
- */
- return computeBestServerForWeight(bestServers, rsServerId, localServerId);
- }
-
- /**
- * If the filtered Map is not empty then it is returned, else return the
- * original unfiltered Map.
- *
- * @return the best fit Map between the filtered Map and the original
- * unfiltered Map.
- */
- private static <K, V> Map<K, V> keepBest(Map<K, V> filteredMap,
- Map<K, V> unfilteredMap)
- {
- if (!filteredMap.isEmpty())
+ else
{
- return filteredMap;
+ /*
+ * We are already connected to a RS: compute the best RS as far as the
+ * weights is concerned. If this is another one, some DS must disconnect.
+ */
+ computeBestServerForWeight(evals, rsServerId, localServerId);
}
- return unfilteredMap;
+ return evals;
}
/**
* Creates a new list that contains only replication servers that are locally
* configured.
- * @param bestServers The list of replication servers to filter
- * @return The sub list of replication servers locally configured
+ * @param evals The evaluation object
*/
- private static Map<Integer, ReplicationServerInfo>
- filterServersLocallyConfigured(Map<Integer,
- ReplicationServerInfo> bestServers)
+ private static void filterServersLocallyConfigured(RSEvaluations evals,
+ int localServerId)
{
- Map<Integer, ReplicationServerInfo> result =
- new HashMap<Integer, ReplicationServerInfo>();
- for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+ final LocalEvaluation eval = new LocalEvaluation();
+ for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
{
- ReplicationServerInfo rsInfo = entry.getValue();
+ final Integer rsId = entry.getKey();
+ final ReplicationServerInfo rsInfo = entry.getValue();
if (rsInfo.isLocallyConfigured())
{
- result.put(entry.getKey(), rsInfo);
+ eval.accept(rsId, rsInfo);
+ }
+ else
+ {
+ eval.reject(rsInfo,
+ NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId));
}
}
- return result;
+ evals.keepBest(eval);
}
/**
* Creates a new list that contains only replication servers that have the
* passed group id, from a passed replication server list.
- * @param bestServers The list of replication servers to filter
+ * @param evals The evaluation object
* @param groupId The group id that must match
- * @return The sub list of replication servers matching the requested group id
- * (which may be empty)
*/
- private static Map<Integer, ReplicationServerInfo>
- filterServersWithSameGroupId(Map<Integer,
- ReplicationServerInfo> bestServers, byte groupId)
+ private static void filterServersWithSameGroupId(RSEvaluations evals,
+ int localServerId, byte groupId)
{
- Map<Integer, ReplicationServerInfo> result =
- new HashMap<Integer, ReplicationServerInfo>();
- for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+ final LocalEvaluation eval = new LocalEvaluation();
+ for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
{
- ReplicationServerInfo rsInfo = entry.getValue();
+ final Integer rsId = entry.getKey();
+ final ReplicationServerInfo rsInfo = entry.getValue();
if (rsInfo.getGroupId() == groupId)
{
- result.put(entry.getKey(), rsInfo);
+ eval.accept(rsId, rsInfo);
+ }
+ else
+ {
+ eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get(
+ rsId, rsInfo.getGroupId(), localServerId, groupId));
}
}
- return result;
+ evals.keepBest(eval);
}
/**
@@ -1510,28 +1683,37 @@
* then the 'empty'(generationId==-1) replication servers are also included
* in the result list.
*
- * @param bestServers The list of replication servers to filter
+ * @param evals The evaluation object
* @param generationId The generation id that must match
- * @return The sub list of replication servers matching the requested
- * generation id (which may be empty)
+ * @return whether some replication server passed the filter
*/
- private static Map<Integer, ReplicationServerInfo>
- filterServersWithSameGenerationId(Map<Integer,
- ReplicationServerInfo> bestServers, long generationId)
+ private static boolean filterServersWithSameGenerationId(
+ RSEvaluations evals, long localServerId, long generationId)
{
- Map<Integer, ReplicationServerInfo> result =
- new HashMap<Integer, ReplicationServerInfo>();
+ final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
+ final LocalEvaluation eval = new LocalEvaluation();
boolean emptyState = true;
for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
{
- ReplicationServerInfo rsInfo = entry.getValue();
+ final Integer rsId = entry.getKey();
+ final ReplicationServerInfo rsInfo = entry.getValue();
if (rsInfo.getGenerationId() == generationId)
{
- result.put(entry.getKey(), rsInfo);
+ eval.accept(rsId, rsInfo);
if (!rsInfo.serverState.isEmpty())
emptyState = false;
}
+ else if (rsInfo.getGenerationId() == -1)
+ {
+ eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId,
+ generationId, localServerId));
+ }
+ else
+ {
+ eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get(
+ rsId, rsInfo.getGenerationId(), localServerId, generationId));
+ }
}
if (emptyState)
@@ -1543,38 +1725,27 @@
ReplicationServerInfo rsInfo = entry.getValue();
if (rsInfo.getGenerationId() == -1)
{
- result.put(entry.getKey(), rsInfo);
+ // will undo the reject of previously rejected RSs
+ eval.accept(entry.getKey(), rsInfo);
}
}
}
- return result;
+
+ return evals.keepBest(eval);
}
/**
* Creates a new list that contains only replication servers that have the
* latest changes from the passed DS, from a passed replication server list.
- * @param bestServers The list of replication servers to filter
+ * @param evals The evaluation object
* @param localState The state of the local DS
* @param localServerId The server id to consider for the changes
- * @return The sub list of replication servers that have the latest changes
- * from the passed DS (which may be empty)
*/
- private static Map<Integer, ReplicationServerInfo>
- filterServersWithAllLocalDSChanges(Map<Integer,
- ReplicationServerInfo> bestServers, ServerState localState,
- int localServerId)
+ private static void filterServersWithAllLocalDSChanges(
+ RSEvaluations evals, ServerState localState, int localServerId)
{
- Map<Integer, ReplicationServerInfo> upToDateServers =
- new HashMap<Integer, ReplicationServerInfo>();
- Map<Integer, ReplicationServerInfo> moreUpToDateServers =
- new HashMap<Integer, ReplicationServerInfo>();
-
// Extract the CSN of the latest change generated by the local server
- CSN myCSN = localState.getCSN(localServerId);
- if (myCSN == null)
- {
- myCSN = new CSN(0, 0, localServerId);
- }
+ final CSN localCSN = getCSN(localState, localServerId);
/**
* Find replication servers that are up to date (or more up to date than us,
@@ -1583,60 +1754,96 @@
* server id. If some servers are more up to date, prefer this list but take
* only the latest CSN.
*/
+ final LocalEvaluation mostUpToDateEval = new LocalEvaluation();
+ boolean foundRSMoreUpToDateThanLocalDS = false;
CSN latestRsCSN = null;
- for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+ for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
{
final Integer rsId = entry.getKey();
final ReplicationServerInfo rsInfo = entry.getValue();
- CSN rsCSN = rsInfo.getServerState().getCSN(localServerId);
- if (rsCSN == null)
- {
- rsCSN = new CSN(0, 0, localServerId);
- }
+ final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId);
// Has this replication server the latest local change ?
- if (myCSN.isOlderThanOrEqualTo(rsCSN))
+ if (rsCSN.isOlderThan(localCSN))
{
- if (myCSN.equals(rsCSN))
+ mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get(
+ rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
+ }
+ else if (rsCSN.equals(localCSN))
+ {
+ // This replication server has exactly the latest change from the
+ // local server
+ if (!foundRSMoreUpToDateThanLocalDS)
{
- // This replication server has exactly the latest change from the
- // local server
- upToDateServers.put(rsId, rsInfo);
- } else
+ mostUpToDateEval.accept(rsId, rsInfo);
+ }
+ else
{
- // This replication server is even more up to date than the local
- // server
- if (latestRsCSN == null)
- {
- // Initialize the latest CSN
- latestRsCSN = rsCSN;
- }
- if (rsCSN.isNewerThanOrEqualTo(latestRsCSN))
- {
- if (rsCSN.equals(latestRsCSN))
- {
- moreUpToDateServers.put(rsId, rsInfo);
- } else
- {
- // This RS is even more up to date, clear the list and store this
- // new RS
- moreUpToDateServers.clear();
- moreUpToDateServers.put(rsId, rsInfo);
- latestRsCSN = rsCSN;
- }
- }
+ mostUpToDateEval.reject(rsInfo,
+ NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
+ rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
+ }
+ }
+ else if (rsCSN.isNewerThan(localCSN))
+ {
+ // This replication server is even more up to date than the local server
+ if (latestRsCSN == null)
+ {
+ foundRSMoreUpToDateThanLocalDS = true;
+ // all previous results are now outdated, reject them all
+ rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
+ localCSN);
+ // Initialize the latest CSN
+ latestRsCSN = rsCSN;
+ }
+
+ if (rsCSN.equals(latestRsCSN))
+ {
+ mostUpToDateEval.accept(rsId, rsInfo);
+ }
+ else if (rsCSN.isNewerThan(latestRsCSN))
+ {
+ // This RS is even more up to date, reject all previously accepted RSs
+ // and store this new RS
+ rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
+ localCSN);
+ mostUpToDateEval.accept(rsId, rsInfo);
+ latestRsCSN = rsCSN;
+ }
+ else
+ {
+ mostUpToDateEval.reject(rsInfo,
+ NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
+ rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
}
}
}
- if (moreUpToDateServers.size() > 0)
- {
- // Prefer servers more up to date than local server
- return moreUpToDateServers;
- }
- return upToDateServers;
+ evals.keepBest(mostUpToDateEval);
}
+ private static CSN getCSN(ServerState localState, int localServerId)
+ {
+ final CSN csn = localState.getCSN(localServerId);
+ if (csn != null)
+ {
+ return csn;
+ }
+ return new CSN(0, 0, localServerId);
+ }
+ private static void rejectAllWithRSIsLaterThanBestRS(
+ final LocalEvaluation eval, int localServerId, CSN localCSN)
+ {
+ for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
+ {
+ final String rsCSN =
+ getCSN(rsInfo.getServerState(), localServerId).toStringUI();
+ final Message reason =
+ NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
+ rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI());
+ eval.reject(rsInfo, reason);
+ }
+ }
/**
* Creates a new list that contains only replication servers that are on the
@@ -1644,22 +1851,18 @@
* method will gives priority to any replication server which is in the same
* VM as this DS.
*
- * @param bestServers
- * The list of replication servers to filter
- * @return The sub list of replication servers being on the same host as the
- * local DS (which may be empty)
+ * @param evals The evaluation object
*/
- private static Map<Integer, ReplicationServerInfo> filterServersOnSameHost(
- Map<Integer, ReplicationServerInfo> bestServers)
+ private static void filterServersOnSameHost(RSEvaluations evals,
+ int localServerId)
{
/*
* Initially look for all servers on the same host. If we find one in the
* same VM, then narrow the search.
*/
boolean foundRSInSameVM = false;
- final Map<Integer, ReplicationServerInfo> result =
- new HashMap<Integer, ReplicationServerInfo>();
- for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
+ final LocalEvaluation eval = new LocalEvaluation();
+ for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
{
final Integer rsId = entry.getKey();
final ReplicationServerInfo rsInfo = entry.getValue();
@@ -1672,24 +1875,41 @@
{
// An RS in the same VM will always have priority.
// Narrow the search to only include servers in this VM.
- result.clear();
+ rejectAllWithRSOnDifferentVMThanDS(eval, localServerId);
foundRSInSameVM = true;
}
- result.put(rsId, rsInfo);
+ eval.accept(rsId, rsInfo);
}
else if (!foundRSInSameVM)
{
// OK, accept RSs on the same machine because we have not found an RS
// in the same VM yet
- result.put(rsId, rsInfo);
+ eval.accept(rsId, rsInfo);
}
else
{
// Skip: we have found some RSs in the same VM, but this RS is not.
+ eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId,
+ localServerId));
}
}
+ else
+ {
+ eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId,
+ localServerId));
+ }
}
- return result;
+ evals.keepBest(eval);
+ }
+
+ private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval,
+ int localServerId)
+ {
+ for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
+ {
+ eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(
+ rsInfo.getServerId(), localServerId));
+ }
}
/**
@@ -1699,23 +1919,18 @@
* Warning: This method is expected to be called with at least 2 servers in
* bestServers
* Note: this method is static for test purpose (access from unit tests)
- * @param bestServers The list of replication servers to consider
+ * @param evals The evaluation object
* @param currentRsServerId The replication server the local server is
* currently connected to. -1 if the local server is not yet connected
* to any replication server.
* @param localServerId The server id of the local server. This is not used
* when it is not connected to a replication server
* (currentRsServerId = -1)
- * @return The replication server the local server should be connected to
- * as far as the weight is concerned. This may be the currently used one if
- * the weight is correctly spread. If the returned value is null, the best
- * replication server is undetermined but the local server must disconnect
- * (so the best replication server is another one than the current one).
*/
- public static ReplicationServerInfo computeBestServerForWeight(
- Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId,
- int localServerId)
+ public static void computeBestServerForWeight(RSEvaluations evals,
+ int currentRsServerId, int localServerId)
{
+ final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
/*
* - Compute the load goal of each RS, deducing it from the weights affected
* to them.
@@ -1765,18 +1980,21 @@
{
// The local server is not connected yet, find best server to connect to,
// taking the weights into account.
- return computeBestServerWhenNotConnected(bestServers, loadDistances);
+ computeBestServerWhenNotConnected(evals, loadDistances, localServerId);
}
- // The local server is currently connected to a RS, let's see if it must
- // disconnect or not, taking the weights into account.
- return computeBestServerWhenConnected(bestServers, loadDistances,
- localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs);
+ else
+ {
+ // The local server is currently connected to a RS, let's see if it must
+ // disconnect or not, taking the weights into account.
+ computeBestServerWhenConnected(evals, loadDistances, localServerId,
+ currentRsServerId, sumOfWeights, sumOfConnectedDSs);
+ }
}
- private static ReplicationServerInfo computeBestServerWhenNotConnected(
- Map<Integer, ReplicationServerInfo> bestServers,
- Map<Integer, BigDecimal> loadDistances)
+ private static void computeBestServerWhenNotConnected(RSEvaluations evals,
+ Map<Integer, BigDecimal> loadDistances, int localServerId)
{
+ final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
/*
* Find the server with the current highest distance to its load goal and
* choose it. Make an exception if every server is correctly balanced,
@@ -1815,14 +2033,15 @@
// Choose server with the highest weight
bestRsId = highestWeightRsId;
}
- return bestServers.get(bestRsId);
+ evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId,
+ bestRsId));
}
- private static ReplicationServerInfo computeBestServerWhenConnected(
- Map<Integer, ReplicationServerInfo> bestServers,
+ private static void computeBestServerWhenConnected(RSEvaluations evals,
Map<Integer, BigDecimal> loadDistances, int localServerId,
int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
{
+ final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
float currentLoadDistance =
loadDistances.get(currentRsServerId).floatValue();
@@ -1928,45 +2147,67 @@
{
// Avoid the yoyo effect, and keep the local DS connected to its
// current RS
- return bestServers.get(currentRsServerId);
+ evals.setBestRS(currentRsServerId,
+ NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId));
+ return;
}
}
- // Prepare a sorted list (from lowest to highest) or DS server ids
- // connected to the current RS
ReplicationServerInfo currentRsInfo =
- bestServers.get(currentRsServerId);
- List<Integer> serversConnectedToCurrentRS =
- new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
- Collections.sort(serversConnectedToCurrentRS);
-
- // Go through the list of DSs to disconnect and see if the local
- // server is part of them.
- int index = 0;
- while (overloadingDSsNumber > 0)
+ bestServers.get(currentRsServerId);
+ if (isServerOverloadingRS(localServerId, currentRsInfo,
+ overloadingDSsNumber))
{
- int serverIdToDisconnect = serversConnectedToCurrentRS.get(index);
- if (serverIdToDisconnect == localServerId)
- {
- // The local server is part of the DSs to disconnect
- return null;
- }
- overloadingDSsNumber--;
- index++;
+ // The local server is part of the DSs to disconnect
+ evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get(
+ localServerId, currentRsServerId));
}
-
- // The local server is not part of the servers to disconnect from the
- // current RS.
+ else
+ {
+ // The local server is not part of the servers to disconnect from the
+ // current RS.
+ evals.setBestRS(currentRsServerId,
+ NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId,
+ currentRsServerId));
+ }
} else {
// The average distance of the other RSs does not show a lack of DSs:
// no need to disconnect any DS from the current RS.
+ evals.setBestRS(currentRsServerId,
+ NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId,
+ currentRsServerId));
}
} else {
// The RS load goal is reached or there are not enough DSs connected to
// it to reach it: do not disconnect from this RS and return rsInfo for
// this RS
+ evals.setBestRS(currentRsServerId,
+ NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId,
+ currentRsServerId));
}
- return bestServers.get(currentRsServerId);
+ }
+
+ /**
+ * Returns whether the local DS is overloading the RS.
+ * <p>
+ * There are an "overloadingDSsNumber" of DS overloading the RS. The list of
+ * DSs connected to this RS is ordered by serverId to use a consistent
+ * ordering across all nodes in the topology. The serverIds which index in the
+ * List are lower than "overloadingDSsNumber" will be evicted first.
+ * <p>
+ * This ordering is unfair since nodes with the lower serverIds will be
+ * evicted more often than nodes with higher serverIds. However, it is a
+ * consistent and reliable ordering applicable anywhere in the topology.
+ */
+ private static boolean isServerOverloadingRS(int localServerId,
+ ReplicationServerInfo currentRsInfo, int overloadingDSsNumber)
+ {
+ List<Integer> serversConnectedToCurrentRS =
+ new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
+ Collections.sort(serversConnectedToCurrentRS);
+
+ final int idx = serversConnectedToCurrentRS.indexOf(localServerId);
+ return idx != -1 && idx < overloadingDSsNumber;
}
/**
@@ -2069,8 +2310,8 @@
if (debugEnabled())
{
- TRACER.debugInfo(this + " end restart : connected=" + connected
- + " with RSid=" + getRsServerId() + " genid=" + this.generationID);
+ debugInfo("end restart : connected=" + connected + " with RS("
+ + getRsServerId() + ") genId=" + this.generationID);
}
}
@@ -2129,8 +2370,8 @@
if (debugEnabled())
{
- TRACER.debugInfo("ReplicationBroker.publish() Publishing a "
- + "message is not possible due to existing connection error.");
+ debugInfo("publish(): Publishing a message is not possible due to"
+ + " existing connection error.");
}
return false;
@@ -2232,8 +2473,8 @@
// ignore
if (debugEnabled())
{
- TRACER.debugInfo("ReplicationBroker.publish() "
- + "Interrupted exception raised : " + e.getLocalizedMessage());
+ debugInfo("publish(): Interrupted exception raised : "
+ + e.getLocalizedMessage());
}
}
}
@@ -2242,8 +2483,8 @@
// just loop.
if (debugEnabled())
{
- TRACER.debugInfo("ReplicationBroker.publish() "
- + "Interrupted exception raised." + e.getLocalizedMessage());
+ debugInfo("publish(): Interrupted exception raised."
+ + e.getLocalizedMessage());
}
}
}
@@ -2393,10 +2634,10 @@
{
// Stable topology (no topo msg since few seconds): proceed with
// best server checking.
- final ReplicationServerInfo bestServerInfo =
+ final RSEvaluations evals =
computeBestReplicationServer(false, previousRsServerID, state,
- replicationServerInfos, serverId, getGroupId(),
- generationID);
+ replicationServerInfos, serverId, getGroupId(), generationID);
+ final ReplicationServerInfo bestServerInfo = evals.getBestRS();
if (previousRsServerID != -1
&& (bestServerInfo == null
|| bestServerInfo.getServerId() != previousRsServerID))
@@ -2413,14 +2654,19 @@
}
else
{
- // TODO JNR log why an RS was evicted as best server
+ final int bestRsServerId = bestServerInfo.getServerId();
message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
serverId, previousRsServerID,
localSession.getReadableRemoteAddress(),
- bestServerInfo.getServerId(),
- baseDN.toNormalizedString());
+ bestRsServerId,
+ baseDN.toNormalizedString(),
+ evals.getEvaluation(previousRsServerID).toString(),
+ evals.getEvaluation(bestRsServerId).toString());
}
logError(message);
+ if (debugEnabled())
+ debugInfo("best replication servers evaluation results: "
+ + evals);
reStart(true);
}
@@ -2472,14 +2718,12 @@
}
/**
- * Gets the States of all the Replicas currently in the
- * Topology.
- * When this method is called, a Monitoring message will be sent
- * to the Replication Server to which this domain is currently connected
- * so that it computes a table containing information about
- * all Directory Servers in the topology.
- * This Computation involves communications will all the servers
- * currently connected and
+ * Gets the States of all the Replicas currently in the Topology. When this
+ * method is called, a Monitoring message will be sent to the Replication
+ * Server to which this domain is currently connected so that it computes a
+ * table containing information about all Directory Servers in the topology.
+ * This Computation involves communications will all the servers currently
+ * connected and
*
* @return The States of all Replicas in the topology (except us)
*/
@@ -2539,9 +2783,8 @@
public void stop()
{
if (debugEnabled())
- TRACER.debugInfo("ReplicationBroker " + getServerId() + " is stopping"
- + " and will close the connection to replication server "
- + rsServerId + " for domain " + getBaseDN());
+ debugInfo("is stopping and will close the connection to"
+ + " replication server " + rsServerId);
synchronized (startStopLock)
{
@@ -2786,7 +3029,7 @@
public void receiveTopo(TopologyMsg topoMsg)
{
if (debugEnabled())
- TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg);
+ debugInfo("receive TopologyMsg=" + topoMsg);
// Store new DS list
dsList = topoMsg.getDsList();
@@ -2876,8 +3119,7 @@
else
{
if (debugEnabled())
- TRACER.debugInfo(this
- + " is not configured to send CSN heartbeat interval");
+ debugInfo("is not configured to send CSN heartbeat interval");
}
}
@@ -3003,4 +3245,10 @@
}
return sb.toString();
}
+
+ private void debugInfo(String message)
+ {
+ TRACER.debugInfo(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
+ + " and serverId=" + getServerId() + " " + message);
+ }
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ComputeBestServerTest.java
similarity index 84%
rename from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
rename to opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ComputeBestServerTest.java
index 2387920..ff9098a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ComputeBestServerTest.java
@@ -25,15 +25,16 @@
* Copyright 2008-2010 Sun Microsystems, Inc.
* Portions Copyright 2013 ForgeRock AS.
*/
-package org.opends.server.replication.plugin;
+package org.opends.server.replication.service;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.Map.Entry;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.data.MapEntry;
import org.opends.messages.Category;
import org.opends.messages.Message;
+import org.opends.messages.MessageDescriptor;
import org.opends.messages.Severity;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
@@ -42,12 +43,15 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.service.ReplicationBroker.RSEvaluations;
import org.opends.server.replication.service.ReplicationBroker.ReplicationServerInfo;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static java.util.Collections.*;
+import static org.assertj.core.data.MapEntry.*;
+import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.service.ReplicationBroker.*;
@@ -106,11 +110,12 @@
Map<Integer, ReplicationServerInfo> rsInfos =
newRSInfos(newRSInfo(11, WINNER, aState, 0, 1));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(), entry(11, NOTE_BEST_RS));
}
private Map<Integer, ReplicationServerInfo> newRSInfos(
@@ -156,11 +161,12 @@
Map<Integer, ReplicationServerInfo> rsInfos =
newRSInfos(newRSInfo(11, WINNER, aState, 0, 1));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(), entry(11, NOTE_BEST_RS));
}
/**
@@ -187,17 +193,16 @@
Map<Integer, ReplicationServerInfo> rsInfos =
newRSInfos(newRSInfo(11, WINNER, aState, 0, 1));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(), entry(11, NOTE_BEST_RS));
}
/**
* Test with one replication server, up to date.
- *
- * @throws Exception If a problem occurred
*/
@Test
public void test1ServerUp() throws Exception
@@ -220,17 +225,16 @@
Map<Integer, ReplicationServerInfo> rsInfos =
newRSInfos(newRSInfo(11, WINNER, aState, 0, 1));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(), entry(11, NOTE_BEST_RS));
}
/**
* Test with 2 replication servers, up to date.
- *
- * @throws Exception If a problem occurred
*/
@Test
public void test2ServersUp() throws Exception
@@ -260,17 +264,18 @@
Map<Integer, ReplicationServerInfo> rsInfos = newRSInfos(
newRSInfo(11, LOOSER1, aState1, 0, 1),
newRSInfo(12, WINNER, aState2, 0, 1));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(),
+ entry(11, NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS),
+ entry(12, NOTE_BEST_RS));
}
/**
* Test with 2 replication servers, up to date, but 2 different group ids.
- *
- * @throws Exception If a problem occurred
*/
@Test
public void testDiffGroup2ServersUp() throws Exception
@@ -302,17 +307,65 @@
Map<Integer, ReplicationServerInfo> rsInfos = newRSInfos(
newRSInfo(11, WINNER, aState1, 0, 1),
newRSInfo(12, LOOSER1, aState2, 0, 2));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(),
+ entry(11, NOTE_BEST_RS),
+ entry(12, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS));
+ }
+
+ private void containsOnly(final Map<Integer, Message> evaluations,
+ MapEntry... entries)
+ {
+ final List<MapEntry> notFound = new ArrayList<MapEntry>(Arrays.asList(entries));
+ for (Iterator<MapEntry> iter = notFound.iterator(); iter.hasNext();)
+ {
+ final MapEntry entry = iter.next();
+ final Message reason = evaluations.get(entry.key);
+ if (reason != null && reason.getDescriptor().equals(entry.value))
+ {
+ iter.remove();
+ }
+ }
+ if (!notFound.isEmpty())
+ {
+ final StringBuilder sb = new StringBuilder("expecting:\n");
+ sb.append(" <").append(getDescription(evaluations)).append(">\n");
+ sb.append(" to contain:\n");
+ sb.append(" <").append(getDescription(Arrays.asList(entries))).append(">\n");
+ sb.append(" but could not find:\n");
+ sb.append(" <").append(getDescription(notFound)).append(">");
+ throw new AssertionError(sb.toString());
+ }
+
+ Assertions.assertThat(evaluations).hasSize(entries.length);
+ }
+
+ private Map<Integer, String> getDescription(Map<Integer, Message> evaluations)
+ {
+ final Map<Integer, String> result = new LinkedHashMap<Integer, String>();
+ for (Entry<Integer, Message> entry : evaluations.entrySet())
+ {
+ result.put(entry.getKey(), entry.getValue().getDescriptor().getKey());
+ }
+ return result;
+ }
+
+ private List<MapEntry> getDescription(List<MapEntry> entries)
+ {
+ final List<MapEntry> result = new ArrayList<MapEntry>();
+ for (MapEntry entry : entries)
+ {
+ result.add(entry(entry.key, ((MessageDescriptor) entry.value).getKey()));
+ }
+ return result;
}
/**
* Test with 2 replication servers, none of them from our group id.
- *
- * @throws Exception If a problem occurred
*/
@Test
public void testNotOurGroup() throws Exception
@@ -342,17 +395,18 @@
Map<Integer, ReplicationServerInfo> rsInfos = newRSInfos(
newRSInfo(11, LOOSER1, aState1, 0, 2),
newRSInfo(12, WINNER, aState2, 0, 2));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(),
+ entry(11, NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS),
+ entry(12, NOTE_BEST_RS));
}
/**
* Test with 3 replication servers, up to date.
- *
- * @throws Exception If a problem occurred
*/
@Test
public void test3ServersUp() throws Exception
@@ -389,17 +443,19 @@
newRSInfo(11, LOOSER1, aState1, 0, 1),
newRSInfo(12, LOOSER2, aState2, 0, 1),
newRSInfo(13, WINNER, aState3, 0, 1));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(),
+ entry(11, NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS),
+ entry(12, NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS),
+ entry(13, NOTE_BEST_RS));
}
/**
* Test with 3 replication servers, up to date, but 2 different group ids.
- *
- * @throws Exception If a problem occurred
*/
@Test
public void testDiffGroup3ServersUp() throws Exception
@@ -438,17 +494,19 @@
// This server has less changes than looser2 but it has the same
// group id as us so he should be the winner
newRSInfo(13, WINNER, aState3, 0, 1));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte)1, 0);
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(),
+ entry(11, NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS),
+ entry(12, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS),
+ entry(13, NOTE_BEST_RS));
}
/**
* Test with one replication server, late.
- *
- * @throws Exception If a problem occurred
*/
@Test
public void test1ServerLate() throws Exception
@@ -471,43 +529,60 @@
Map<Integer, ReplicationServerInfo> rsInfos =
newRSInfos(newRSInfo(11, WINNER, aState, 0, 1));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte) 1, 0);
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(), entry(11, NOTE_BEST_RS));
}
@DataProvider(name = "create3ServersData")
public Object[][] create3ServersData() {
return new Object[][] {
// first RS is up to date, the others are late none is local
- { 4, 2, 3, false, 1, 2, 3, false, 2, 3, 4, false},
+ { 4, 2, 3, false,
+ 1, 2, 3, false,
+ 2, 3, 4, false},
// test that the local RS is chosen first when all up to date
- { 4, 2, 3, true, 4, 2, 3, false, 4, 2, 3, false},
+ { 4, 2, 3, true,
+ 4, 2, 3, false,
+ 4, 2, 3, false},
// test that the local ServerID is more important than the others
- { 4, 0, 0, false, 2, 100, 100, false, 1, 100, 100, false},
+ { 4, 0, 0, false,
+ 2, 100, 100, false,
+ 1, 100, 100, false},
// test that a remote RS is chosen first when up to date when the local
// one is late
- { 4, 1, 1, false, 3, 1, 1, true, 3, 1, 1, false},
+ { 4, 1, 1, false,
+ 3, 1, 1, true,
+ 3, 1, 1, false},
// test that the local RS is not chosen first when it is missing
// local changes
- { 4, 1, 1, false, 3, 2, 3, false, 1, 1, 1, true},
+ { 4, 1, 1, false,
+ 3, 2, 3, false,
+ 1, 1, 1, true},
// test that a RS which is more up to date than the DS is chosen
- { 5, 1, 1, false, 2, 0, 0, false, 1, 1, 1, false},
+ { 5, 1, 1, false,
+ 2, 0, 0, false,
+ 1, 1, 1, false},
// test that a RS which is more up to date than the DS is chosen even
// is some RS with the same last change from the DS
- { 5, 1, 1, false, 4, 0, 0, false, 4, 1, 1, false},
+ { 5, 1, 1, false,
+ 4, 0, 0, false,
+ 4, 1, 1, false},
// test that the local RS is chosen first when it is missing
// the same local changes as the other RSs
- { 3, 1, 1, true, 2, 1, 1, false, 3, 1, 1, false},
+ { 3, 1, 1, true,
+ 2, 1, 1, false,
+ 3, 1, 1, false},
};
}
@@ -564,13 +639,31 @@
newRSInfo(11, LOOSER1, aState1, 0, 1),
newRSInfo(12, WINNER, aState2, 0, 1),
newRSInfo(13, LOOSER2, aState3, 0, 1));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte) 1, 0);
ReplicationServer.onlyForTestsClearLocalReplicationServerList();
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ final boolean winnerIsLatestRS = winnerT1 > 4 && looser1T1 == 4 && looser2T1 == 4;
+ containsOnly(evals.getEvaluations(),
+ entry(11, getEval1(winnerIsLocal, looser1IsLocal, winnerIsLatestRS)),
+ entry(12, NOTE_BEST_RS),
+ entry(13, getEval1(winnerIsLocal, looser2IsLocal, winnerIsLatestRS)));
+ }
+
+ private MessageDescriptor getEval1(boolean winnerIsLocal, boolean looserIsLocal, boolean winnerIsLatestRS)
+ {
+ if (winnerIsLocal && !looserIsLocal)
+ {
+ return NOTE_RS_ON_DIFFERENT_VM_THAN_DS;
+ }
+ else if (winnerIsLatestRS)
+ {
+ return NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS;
+ }
+ return NOTE_RS_LATER_THAN_LOCAL_DS;
}
@DataProvider(name = "test3ServersMoreCriteria")
@@ -578,15 +671,21 @@
return new Object[][] {
// Test that a RS is chosen if its group is ok whereas the other parameters
// are not ok
- { 1L, 1L, (byte)1, false, 4L, 0L, (byte)2, false, 4L, 0L, (byte)3, false},
+ { 1L, 1L, 1, false,
+ 4L, 0L, 2, false,
+ 4L, 0L, 3, false},
// Test that a RS is chosen if its genid is ok (all RS with same group)
// and state is not ok
- { 1L, 0L, (byte)1, false, 4L, 1L, (byte)1, false, 4L, 2L, (byte)1, false},
+ { 1L, 0L, 1, false,
+ 4L, 1L, 1, false,
+ 4L, 2L, 1, false},
// Test that a RS is chosen if all servers have wrong genid and group id
// but it is local
- { 1L, 1L, (byte)2, true, 4L, 2L, (byte)3, false, 5L, 3L, (byte)4, false}
+ { 1L, 1L, 2, true,
+ 4L, 2L, 3, false,
+ 5L, 3L, 4, false}
};
}
@@ -595,9 +694,9 @@
*/
@Test(dataProvider = "test3ServersMoreCriteria")
public void test3ServersMoreCriteria(
- long winnerT1, long winnerGenId, byte winnerGroupId, boolean winnerIsLocal,
- long looser1T1, long looser1GenId, byte looser1GroupId, boolean looser1IsLocal,
- long looser2T1, long looser2GenId, byte looser2GroupId, boolean looser2IsLocal)
+ long winnerT1, long winnerGenId, int winnerGroupId, boolean winnerIsLocal,
+ long looser1T1, long looser1GenId, int looser1GroupId, boolean looser1IsLocal,
+ long looser2T1, long looser2GenId, int looser2GroupId, boolean looser2IsLocal)
throws Exception
{
String testCase = "test3ServersMoreCriteria";
@@ -635,13 +734,30 @@
newRSInfo(11, LOOSER1, aState1, looser1GenId, looser1GroupId),
newRSInfo(12, WINNER, aState2, winnerGenId, winnerGroupId),
newRSInfo(13, LOOSER2, aState3, looser2GenId, looser2GroupId));
- ReplicationServerInfo bestServer =
+ RSEvaluations evals =
computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, (byte) 1, 0);
ReplicationServer.onlyForTestsClearLocalReplicationServerList();
- assertEquals(bestServer.getServerURL(),
- WINNER, "Wrong best replication server.");
+ assertEquals(evals.getBestRS().getServerURL(), WINNER,
+ "Wrong best replication server.");
+ containsOnly(evals.getEvaluations(),
+ entry(11, getEval2(winnerGroupId == looser1GroupId, winnerIsLocal, looser1IsLocal)),
+ entry(12, NOTE_BEST_RS),
+ entry(13, getEval2(winnerGroupId == looser2GroupId, winnerIsLocal, looser2IsLocal)));
+ }
+
+ private MessageDescriptor getEval2(boolean sameGroupId, boolean winnerIsLocal, boolean looserIsLocal)
+ {
+ if (winnerIsLocal && !looserIsLocal)
+ {
+ return NOTE_RS_ON_DIFFERENT_VM_THAN_DS;
+ }
+ else if (!sameGroupId)
+ {
+ return NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS;
+ }
+ return NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS;
}
@SuppressWarnings("unchecked")
@@ -691,7 +807,7 @@
rsInfos,
-1, // current RS id
-1, // local DS id
- rsInfos.values().iterator().next().getServerURL(), // winner url
+ "BwinnerHost:123", // winner url
};
/**
@@ -738,7 +854,7 @@
rsInfos,
-1, // current RS id
-1, // local DS id
- rsInfos.values().iterator().next().getServerURL(), // winner url
+ "DwinnerHost:123", // winner url
};
/**
@@ -1311,8 +1427,9 @@
debugInfo("Starting " + testCase);
- ReplicationServerInfo bestServer =
- computeBestServerForWeight(servers, currentRsServerId, localServerId);
+ final RSEvaluations evals = new RSEvaluations(localServerId, servers);
+ computeBestServerForWeight(evals, currentRsServerId, localServerId);
+ final ReplicationServerInfo bestServer = evals.getBestRS();
if (winnerUrl == null)
{
--
Gitblit v1.10.0