From 4bb3d3af3d032a98f2ca318c81be5c4f81034b8f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 11 Oct 2013 12:27:16 +0000
Subject: [PATCH] ReplicationBroker.java: In computeInitialServerStatus(), used early exits. In computeBestServerForWeight(), extracted methods computeBestServerWhenNotConnected() and computeBestServerWhenConnected(). Changed replicationServerUrls from Collection<String> to Set<String>. Removed useless field initialization to null. Renamed _publish() to publish() + reduced local variables scope. In receive(), renamed local variable replicationServerID to previousRsServerID and used this one more rather than the field. In changeConfig(), used Set.equals(). Changed getReplicationMonitor() to getReplicationMonitorInstanceName().
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 564 +++++++++++++++++++++++++++----------------------------
1 files changed, 278 insertions(+), 286 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index d486e8d..8bd15cb 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,7 +31,10 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
-import java.net.*;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -72,7 +75,7 @@
/**
* Replication server URLs under this format: "<code>hostname:port</code>".
*/
- private volatile Collection<String> replicationServerUrls;
+ private volatile Set<String> replicationServerUrls;
private volatile boolean connected = false;
/**
* String reported under CSN=monitor when there is no connected RS.
@@ -98,9 +101,9 @@
/** The server id of the RS we are connected to. */
private Integer rsServerId = -1;
/** The server URL of the RS we are connected to. */
- private String rsServerUrl = null;
+ private String rsServerUrl;
/** Our replication domain. */
- private ReplicationDomain domain = null;
+ private ReplicationDomain domain;
/**
* This object is used as a conditional event to be notified about
* the reception of monitor information from the Replication Server.
@@ -121,7 +124,7 @@
/**
* A thread to monitor heartbeats on the session.
*/
- private HeartbeatMonitor heartbeatMonitor = null;
+ private HeartbeatMonitor heartbeatMonitor;
/**
* The number of times the connection was lost.
*/
@@ -140,7 +143,7 @@
* The thread that publishes messages to the RS containing the current
* change time of this DS.
*/
- private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
+ private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
/**
* The expected period in milliseconds between these messages are sent
* to the replication server. Zero means heartbeats are off.
@@ -149,8 +152,11 @@
/*
* Properties for the last topology info received from the network.
*/
- // Info for other DSs.
- // Warning: does not contain info for us (for our server id)
+ /**
+ * Info for other DSs.
+ * <p>
+ * Warning: does not contain info for us (for our server id)
+ */
private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
private volatile long generationID;
private volatile int updateDoneCount = 0;
@@ -162,8 +168,7 @@
* replication server one wants to connect. Key: replication server id Value:
* replication server info for the matching replication server id
*/
- private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos
- = null;
+ private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos;
/**
* This integer defines when the best replication server checking algorithm
@@ -256,7 +261,7 @@
*
* @param replicationServers list of servers used
*/
- public void start(Collection<String> replicationServers)
+ public void start(Set<String> replicationServers)
{
synchronized (startStopLock)
{
@@ -1068,60 +1073,45 @@
{
// RS has no generation id
return ServerStatus.NORMAL_STATUS;
- } else
+ }
+ else if (rsGenId != dsGenId)
{
- if (rsGenId == dsGenId)
+ // DS and RS do not have same generation id
+ return ServerStatus.BAD_GEN_ID_STATUS;
+ }
+ else
+ {
+ /*
+ DS and RS have same generation id
+
+ Determine if we are late or not to replay changes. RS uses a
+ threshold value for pending changes to be replayed by a DS to
+ determine if the DS is in normal status or in degraded status.
+ Let's compare the local and remote server state using this threshold
+ value to determine if we are late or not
+ */
+
+ int nChanges = ServerState.diffChanges(rsState, state);
+ if (debugEnabled())
{
- /*
- DS and RS have same generation id
-
- Determine if we are late or not to replay changes. RS uses a
- threshold value for pending changes to be replayed by a DS to
- determine if the DS is in normal status or in degraded status.
- Let's compare the local and remote server state using this threshold
- value to determine if we are late or not
- */
-
- ServerStatus initStatus;
- int nChanges = ServerState.diffChanges(rsState, state);
-
- if (debugEnabled())
- {
- TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
- + serverId + " computed " + nChanges + " changes late.");
- }
-
- /*
- Check status to know if it is relevant to change the status. Do not
- take RSD lock to test. If we attempt to change the status whereas
- we are in a status that do not allows that, this will be noticed by
- the changeStatusFromStatusAnalyzer method. This allows to take the
- lock roughly only when needed versus every sleep time timeout.
- */
- if (degradedStatusThreshold > 0)
- {
- if (nChanges >= degradedStatusThreshold)
- {
- initStatus = ServerStatus.DEGRADED_STATUS;
- } else
- {
- initStatus = ServerStatus.NORMAL_STATUS;
- }
- } else
- {
- /*
- 0 threshold value means no degrading system used (no threshold):
- force normal status
- */
- initStatus = ServerStatus.NORMAL_STATUS;
- }
-
- return initStatus;
- } else
- {
- // DS and RS do not have same generation id
- return ServerStatus.BAD_GEN_ID_STATUS;
+ TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
+ + serverId + " computed " + nChanges + " changes late.");
}
+
+ /*
+ Check status to know if it is relevant to change the status. Do not
+ take RSD lock to test. If we attempt to change the status whereas
+ we are in a status that do not allows that, this will be noticed by
+ the changeStatusFromStatusAnalyzer method. This allows to take the
+ lock roughly only when needed versus every sleep time timeout.
+ */
+ if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold)
+ {
+ return ServerStatus.DEGRADED_STATUS;
+ }
+ // degradedStatusThreshold value of '0' means no degrading system used
+ // (no threshold): force normal status
+ return ServerStatus.NORMAL_STATUS;
}
}
@@ -1472,20 +1462,16 @@
return bestServers.values().iterator().next();
}
- if (firstConnection)
- {
- // We are not connected to a server yet
- return computeBestServerForWeight(bestServers, -1, -1);
- } else
- {
- /*
- We are already connected to a RS: compute the best RS as far as the
- weights is concerned. If this is another one, some DS must
- disconnect.
- */
- return computeBestServerForWeight(bestServers, rsServerId,
- localServerId);
- }
+ if (firstConnection)
+ {
+ // We are not connected to a server yet
+ return computeBestServerForWeight(bestServers, -1, -1);
+ }
+ /*
+ * We are already connected to a RS: compute the best RS as far as the
+ * weights is concerned. If this is another one, some DS must disconnect.
+ */
+ return computeBestServerForWeight(bestServers, rsServerId, localServerId);
}
/**
@@ -1783,11 +1769,12 @@
sumOfWeights += replicationServerInfo.getWeight();
sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
}
+
// Distance (difference) of the current loads to the load goals of each RS:
// key:server id, value: distance
Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
// Precision for the operations (number of digits after the dot)
- MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
+ final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
@@ -1812,199 +1799,210 @@
if (currentRsServerId == -1)
{
- // The local server is not connected yet
+ // The local server is not connected yet, find best server to connect to,
+ // taking the weights into account.
+ return computeBestServerWhenNotConnected(bestServers, loadDistances);
+ }
+ // The local server is currently connected to a RS, let's see if it must
+ // disconnect or not, taking the weights into account.
+ return computeBestServerWhenConnected(bestServers, loadDistances,
+ localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs);
+ }
+ private static ReplicationServerInfo computeBestServerWhenNotConnected(
+ Map<Integer, ReplicationServerInfo> bestServers,
+ Map<Integer, BigDecimal> loadDistances)
+ {
+ /*
+ * Find the server with the current highest distance to its load goal and
+ * choose it. Make an exception if every server is correctly balanced,
+ * that is every current load distances are equal to 0, in that case,
+ * choose the server with the highest weight
+ */
+ int bestRsId = 0; // If all server equal, return the first one
+ float highestDistance = Float.NEGATIVE_INFINITY;
+ boolean allRsWithZeroDistance = true;
+ int highestWeightRsId = -1;
+ int highestWeight = -1;
+ for (Integer rsId : bestServers.keySet())
+ {
+ float loadDistance = loadDistances.get(rsId).floatValue();
+ if (loadDistance > highestDistance)
+ {
+ // This server is far more from its balance point
+ bestRsId = rsId;
+ highestDistance = loadDistance;
+ }
+ if (loadDistance != 0)
+ {
+ allRsWithZeroDistance = false;
+ }
+ int weight = bestServers.get(rsId).getWeight();
+ if (weight > highestWeight)
+ {
+ // This server has a higher weight
+ highestWeightRsId = rsId;
+ highestWeight = weight;
+ }
+ }
+ // All servers with a 0 distance ?
+ if (allRsWithZeroDistance)
+ {
+ // Choose server with the highest weight
+ bestRsId = highestWeightRsId;
+ }
+ return bestServers.get(bestRsId);
+ }
+
+ private static ReplicationServerInfo computeBestServerWhenConnected(
+ Map<Integer, ReplicationServerInfo> bestServers,
+ Map<Integer, BigDecimal> loadDistances, int localServerId,
+ int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
+ {
+ final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
+ float currentLoadDistance =
+ loadDistances.get(currentRsServerId).floatValue();
+ if (currentLoadDistance < 0)
+ {
/*
- * Find the server with the current highest distance to its load goal and
- * choose it. Make an exception if every server is correctly balanced,
- * that is every current load distances are equal to 0, in that case,
- * choose the server with the highest weight
- */
- int bestRsId = 0; // If all server equal, return the first one
- float highestDistance = Float.NEGATIVE_INFINITY;
- boolean allRsWithZeroDistance = true;
- int highestWeightRsId = -1;
- int highestWeight = -1;
+ Too much DSs connected to the current RS, compared with its load
+ goal:
+ Determine the potential number of DSs to disconnect from the current
+ RS and see if the local DS is part of them: the DSs that must
+ disconnect are those with the lowest server id.
+ Compute the sum of the distances of the load goals of the other RSs
+ */
+ BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
for (Integer rsId : bestServers.keySet())
{
- float loadDistance = loadDistances.get(rsId).floatValue();
- if (loadDistance > highestDistance)
+ if (rsId != currentRsServerId)
{
- // This server is far more from its balance point
- bestRsId = rsId;
- highestDistance = loadDistance;
- }
- if (loadDistance != 0)
- {
- allRsWithZeroDistance = false;
- }
- int weight = bestServers.get(rsId).getWeight();
- if (weight > highestWeight)
- {
- // This server has a higher weight
- highestWeightRsId = rsId;
- highestWeight = weight;
+ sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
+ loadDistances.get(rsId), mathContext);
}
}
- // All servers with a 0 distance ?
- if (allRsWithZeroDistance)
- {
- // Choose server with the highest weight
- bestRsId = highestWeightRsId;
- }
- return bestServers.get(bestRsId);
- } else
- {
- // The local server is currently connected to a RS, let's see if it must
- // disconnect or not, taking the weights into account.
- float currentLoadDistance =
- loadDistances.get(currentRsServerId).floatValue();
- if (currentLoadDistance < 0)
+ if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
{
/*
- Too much DSs connected to the current RS, compared with its load
- goal:
- Determine the potential number of DSs to disconnect from the current
- RS and see if the local DS is part of them: the DSs that must
- disconnect are those with the lowest server id.
- Compute the sum of the distances of the load goals of the other RSs
+ The average distance of the other RSs shows a lack of DSs.
+ Compute the number of DSs to disconnect from the current RS,
+ rounding to the nearest integer number. Do only this if there is
+ no risk of yoyo effect: when the exact balance cannot be
+ established due to the current number of DSs connected, do not
+ disconnect a DS. A simple example where the balance cannot be
+ reached is:
+ - RS1 has weight 1 and 2 DSs
+ - RS2 has weight 1 and 1 DS
+ => disconnecting a DS from RS1 to reconnect it to RS2 would have no
+ sense as this would lead to the reverse situation. In that case,
+ the perfect balance cannot be reached and we must stick to the
+ current situation, otherwise the DS would keep move between the 2
+ RSs
*/
- BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
- for (Integer rsId : bestServers.keySet())
+ float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
+ multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
+ .floatValue();
+ int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
+
+ // Avoid yoyo effect
+ if (overloadingDSsNumber == 1)
{
- if (rsId != currentRsServerId)
- {
- sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
- loadDistances.get(rsId), mathContext);
- }
- }
-
- if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
- {
- /*
- The average distance of the other RSs shows a lack of DSs.
- Compute the number of DSs to disconnect from the current RS,
- rounding to the nearest integer number. Do only this if there is
- no risk of yoyo effect: when the exact balance cannot be
- established due to the current number of DSs connected, do not
- disconnect a DS. A simple example where the balance cannot be
- reached is:
- - RS1 has weight 1 and 2 DSs
- - RS2 has weight 1 and 1 DS
- => disconnecting a DS from RS1 to reconnect it to RS2 would have no
- sense as this would lead to the reverse situation. In that case,
- the perfect balance cannot be reached and we must stick to the
- current situation, otherwise the DS would keep move between the 2
- RSs
- */
- float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
- multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
- .floatValue();
- int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
-
- // Avoid yoyo effect
- if (overloadingDSsNumber == 1)
- {
- // What would be the new load distance for the current RS if
- // we disconnect some DSs ?
- ReplicationServerInfo currentReplicationServerInfo =
- bestServers.get(currentRsServerId);
-
- int currentRsWeight = currentReplicationServerInfo.getWeight();
- BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
- BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
- BigDecimal currentRsLoadGoalBd =
- currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
- BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
- if (sumOfConnectedDSs != 0)
- {
- int connectedDSs = currentReplicationServerInfo.
- getConnectedDSNumber();
- BigDecimal potentialNewConnectedDSsBd =
- BigDecimal.valueOf(connectedDSs - 1);
- BigDecimal sumOfConnectedDSsBd =
- BigDecimal.valueOf(sumOfConnectedDSs);
- potentialCurrentRsNewLoadBd =
- potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
- mathContext);
- }
- BigDecimal potentialCurrentRsNewLoadDistanceBd =
- currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
- mathContext);
-
- // What would be the new load distance for the other RSs ?
- BigDecimal additionalDsLoadBd =
- BigDecimal.ONE.divide(
- BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
- BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
- sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
- mathContext);
-
- /*
- Now compare both values: we must no disconnect the DS if this
- is for going in a situation where the load distance of the other
- RSs is the opposite of the future load distance of the local RS
- or we would evaluate that we should disconnect just after being
- arrived on the new RS. But we should disconnect if we reach the
- perfect balance (both values are 0).
- */
- MathContext roundMc =
- new MathContext(6, RoundingMode.DOWN);
- BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
- potentialCurrentRsNewLoadDistanceBd.round(roundMc);
- BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
- potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
-
- if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
- BigDecimal.ZERO) != 0)
- && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
- potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
- {
- // Avoid the yoyo effect, and keep the local DS connected to its
- // current RS
- return bestServers.get(currentRsServerId);
- }
- }
-
- // Prepare a sorted list (from lowest to highest) or DS server ids
- // connected to the current RS
- ReplicationServerInfo currentRsInfo =
+ // What would be the new load distance for the current RS if
+ // we disconnect some DSs ?
+ ReplicationServerInfo currentReplicationServerInfo =
bestServers.get(currentRsServerId);
- List<Integer> serversConnectedToCurrentRS =
- currentRsInfo.getConnectedDSs();
- List<Integer> sortedServers = new ArrayList<Integer>(
- serversConnectedToCurrentRS);
- Collections.sort(sortedServers);
- // Go through the list of DSs to disconnect and see if the local
- // server is part of them.
- int index = 0;
- while (overloadingDSsNumber > 0)
+ int currentRsWeight = currentReplicationServerInfo.getWeight();
+ BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
+ BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
+ BigDecimal currentRsLoadGoalBd =
+ currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
+ BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
+ if (sumOfConnectedDSs != 0)
{
- int severToDisconnectId = sortedServers.get(index);
- if (severToDisconnectId == localServerId)
- {
- // The local server is part of the DSs to disconnect
- return null;
- }
- overloadingDSsNumber--;
- index++;
+ int connectedDSs = currentReplicationServerInfo.
+ getConnectedDSNumber();
+ BigDecimal potentialNewConnectedDSsBd =
+ BigDecimal.valueOf(connectedDSs - 1);
+ BigDecimal sumOfConnectedDSsBd =
+ BigDecimal.valueOf(sumOfConnectedDSs);
+ potentialCurrentRsNewLoadBd =
+ potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
+ mathContext);
}
+ BigDecimal potentialCurrentRsNewLoadDistanceBd =
+ currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
+ mathContext);
- // The local server is not part of the servers to disconnect from the
- // current RS.
- } else {
- // The average distance of the other RSs does not show a lack of DSs:
- // no need to disconnect any DS from the current RS.
+ // What would be the new load distance for the other RSs ?
+ BigDecimal additionalDsLoadBd =
+ BigDecimal.ONE.divide(
+ BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
+ BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
+ sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
+ mathContext);
+
+ /*
+ Now compare both values: we must no disconnect the DS if this
+ is for going in a situation where the load distance of the other
+ RSs is the opposite of the future load distance of the local RS
+ or we would evaluate that we should disconnect just after being
+ arrived on the new RS. But we should disconnect if we reach the
+ perfect balance (both values are 0).
+ */
+ MathContext roundMc = new MathContext(6, RoundingMode.DOWN);
+ BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
+ potentialCurrentRsNewLoadDistanceBd.round(roundMc);
+ BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
+ potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
+
+ if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
+ BigDecimal.ZERO) != 0)
+ && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
+ potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
+ {
+ // Avoid the yoyo effect, and keep the local DS connected to its
+ // current RS
+ return bestServers.get(currentRsServerId);
+ }
}
+
+ // Prepare a sorted list (from lowest to highest) or DS server ids
+ // connected to the current RS
+ ReplicationServerInfo currentRsInfo =
+ bestServers.get(currentRsServerId);
+ List<Integer> serversConnectedToCurrentRS =
+ new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
+ Collections.sort(serversConnectedToCurrentRS);
+
+ // Go through the list of DSs to disconnect and see if the local
+ // server is part of them.
+ int index = 0;
+ while (overloadingDSsNumber > 0)
+ {
+ int serverIdToDisconnect = serversConnectedToCurrentRS.get(index);
+ if (serverIdToDisconnect == localServerId)
+ {
+ // The local server is part of the DSs to disconnect
+ return null;
+ }
+ overloadingDSsNumber--;
+ index++;
+ }
+
+ // The local server is not part of the servers to disconnect from the
+ // current RS.
} else {
- // The RS load goal is reached or there are not enough DSs connected to
- // it to reach it: do not disconnect from this RS and return rsInfo for
- // this RS
+ // The average distance of the other RSs does not show a lack of DSs:
+ // no need to disconnect any DS from the current RS.
}
- return bestServers.get(currentRsServerId);
+ } else {
+ // The RS load goal is reached or there are not enough DSs connected to
+ // it to reach it: do not disconnect from this RS and return rsInfo for
+ // this RS
}
+ return bestServers.get(currentRsServerId);
}
/**
@@ -2117,7 +2115,7 @@
*/
public void publish(ReplicationMsg msg)
{
- _publish(msg, false, true);
+ publish(msg, false, true);
}
/**
@@ -2128,7 +2126,7 @@
*/
public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
{
- return _publish(msg, false, retryOnFailure);
+ return publish(msg, false, retryOnFailure);
}
/**
@@ -2137,7 +2135,7 @@
*/
public void publishRecovery(ReplicationMsg msg)
{
- _publish(msg, true, true);
+ publish(msg, true, true);
}
/**
@@ -2147,7 +2145,7 @@
* @param retryOnFailure whether retry should be done on failure
* @return whether the message was successfully sent.
*/
- boolean _publish(ReplicationMsg msg, boolean recoveryMsg,
+ private boolean publish(ReplicationMsg msg, boolean recoveryMsg,
boolean retryOnFailure)
{
boolean done = false;
@@ -2175,10 +2173,6 @@
try
{
- boolean credit;
- Session current_session;
- Semaphore currentWindowSemaphore;
-
/*
save the session at the time when we acquire the
sendwindow credit so that we can make sure later
@@ -2187,9 +2181,11 @@
on a session with a credit that was acquired from a previous
session.
*/
+ Session currentSession;
+ Semaphore currentWindowSemaphore;
synchronized (connectPhaseLock)
{
- current_session = session;
+ currentSession = session;
currentWindowSemaphore = sendWindow;
}
@@ -2204,6 +2200,7 @@
return false;
}
+ boolean credit;
if (msg instanceof UpdateMsg)
{
/*
@@ -2217,6 +2214,7 @@
{
credit = true;
}
+
if (credit)
{
synchronized (connectPhaseLock)
@@ -2228,8 +2226,7 @@
reconnection happened and we need to restart from scratch.
*/
- if ((session != null) &&
- (session == current_session))
+ if (session != null && session == currentSession)
{
session.publish(msg);
done = true;
@@ -2340,7 +2337,7 @@
break;
}
- final int replicationServerID = rsServerId;
+ final int previousRsServerID = rsServerId;
try
{
ReplicationMsg msg = savedSession.receive();
@@ -2375,7 +2372,7 @@
{
// RS performs a proper disconnection
Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
- replicationServerID, savedSession.getReadableRemoteAddress(),
+ previousRsServerID, savedSession.getReadableRemoteAddress(),
serverId, baseDN.toNormalizedString());
logError(message);
@@ -2425,13 +2422,12 @@
{
// Stable topology (no topo msg since few seconds): proceed with
// best server checking.
- ReplicationServerInfo bestServerInfo =
- computeBestReplicationServer(false, rsServerId, state,
- replicationServerInfos, serverId, groupId,
- generationID);
-
- if ((rsServerId != -1) && ((bestServerInfo == null) ||
- (bestServerInfo.getServerId() != rsServerId)))
+ final ReplicationServerInfo bestServerInfo =
+ computeBestReplicationServer(false, previousRsServerID, state,
+ replicationServerInfos, serverId, groupId, generationID);
+ if (previousRsServerID != -1
+ && (bestServerInfo == null
+ || bestServerInfo.getServerId() != previousRsServerID))
{
// The best replication server is no more the one we are
// currently using. Disconnect properly then reconnect.
@@ -2439,14 +2435,14 @@
if (bestServerInfo == null)
{
message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
- serverId, replicationServerID,
+ serverId, previousRsServerID,
savedSession.getReadableRemoteAddress(),
baseDN.toNormalizedString());
}
else
{
message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
- serverId, replicationServerID,
+ serverId, previousRsServerID,
savedSession.getReadableRemoteAddress(),
bestServerInfo.getServerId(),
baseDN.toNormalizedString());
@@ -2483,7 +2479,7 @@
{
// We did not initiate the close on our side, log an error message.
Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
- serverId, baseDN.toNormalizedString(), replicationServerID,
+ serverId, baseDN.toNormalizedString(), previousRsServerID,
savedSession.getReadableRemoteAddress());
logError(message);
}
@@ -2684,9 +2680,8 @@
* requires to restart the service.
* @param groupId The new group id to use
*/
- public boolean changeConfig(
- Collection<String> replicationServers, int window, long heartbeatInterval,
- byte groupId)
+ public boolean changeConfig(Set<String> replicationServers, int window,
+ long heartbeatInterval, byte groupId)
{
// These parameters needs to be renegotiated with the ReplicationServer
// so if they have changed, that requires restarting the session with
@@ -2695,8 +2690,7 @@
// the connection is modified
boolean needToRestartSession =
this.replicationServerUrls == null
- || replicationServers.size() != this.replicationServerUrls.size()
- || !replicationServers.containsAll(this.replicationServerUrls)
+ || !replicationServers.equals(this.replicationServerUrls)
|| window != this.maxRcvWindow
|| heartbeatInterval != this.heartbeatInterval
|| groupId != this.groupId;
@@ -2788,12 +2782,10 @@
*/
public List<RSInfo> getRsList()
{
- List<RSInfo> result = new ArrayList<RSInfo>();
-
- for (ReplicationServerInfo replicationServerInfo :
- replicationServerInfos.values())
+ final List<RSInfo> result = new ArrayList<RSInfo>();
+ for (ReplicationServerInfo rsInfo : replicationServerInfos.values())
{
- result.add(replicationServerInfo.toRSInfo());
+ result.add(rsInfo.toRSInfo());
}
return result;
}
@@ -2989,14 +2981,14 @@
}
/**
- * Returns the replication monitor associated with this broker.
+ * Returns the replication monitor instance name associated with this broker.
*
- * @return The replication monitor.
+ * @return The replication monitor instance name.
*/
- ReplicationMonitor getReplicationMonitor()
+ String getReplicationMonitorInstanceName()
{
// Only invoked by replication domain so always non-null.
- return monitor;
+ return monitor.getMonitorInstanceName();
}
private void setSession(final Session newSession)
--
Gitblit v1.10.0