From 4bb3d3af3d032a98f2ca318c81be5c4f81034b8f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 11 Oct 2013 12:27:16 +0000
Subject: [PATCH] ReplicationBroker.java: In computeInitialServerStatus(), used early exits. In computeBestServerForWeight(), extracted methods computeBestServerWhenNotConnected() and computeBestServerWhenConnected(). Changed replicationServerUrls from Collection<String> to Set<String>. Removed useless field initialization to null. Renamed _publish() to publish() + reduced local variables scope. In receive(), renamed local variable replicationServerID to previousRsServerID and used this one more rather than the field. In changeConfig(), used Set.equals(). Changed getReplicationMonitor() to getReplicationMonitorInstanceName().
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java | 9
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java | 22 -
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 10
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 564 +++++++++++++++++++++----------------------
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java | 16 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 44 +--
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java | 17
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java | 23 -
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java | 13
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java | 14
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 42 +--
12 files changed, 357 insertions(+), 419 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index d486e8d..8bd15cb 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,7 +31,10 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
-import java.net.*;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -72,7 +75,7 @@
/**
* Replication server URLs under this format: "<code>hostname:port</code>".
*/
- private volatile Collection<String> replicationServerUrls;
+ private volatile Set<String> replicationServerUrls;
private volatile boolean connected = false;
/**
* String reported under CSN=monitor when there is no connected RS.
@@ -98,9 +101,9 @@
/** The server id of the RS we are connected to. */
private Integer rsServerId = -1;
/** The server URL of the RS we are connected to. */
- private String rsServerUrl = null;
+ private String rsServerUrl;
/** Our replication domain. */
- private ReplicationDomain domain = null;
+ private ReplicationDomain domain;
/**
* This object is used as a conditional event to be notified about
* the reception of monitor information from the Replication Server.
@@ -121,7 +124,7 @@
/**
* A thread to monitor heartbeats on the session.
*/
- private HeartbeatMonitor heartbeatMonitor = null;
+ private HeartbeatMonitor heartbeatMonitor;
/**
* The number of times the connection was lost.
*/
@@ -140,7 +143,7 @@
* The thread that publishes messages to the RS containing the current
* change time of this DS.
*/
- private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
+ private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
/**
* The expected period in milliseconds between these messages are sent
* to the replication server. Zero means heartbeats are off.
@@ -149,8 +152,11 @@
/*
* Properties for the last topology info received from the network.
*/
- // Info for other DSs.
- // Warning: does not contain info for us (for our server id)
+ /**
+ * Info for other DSs.
+ * <p>
+ * Warning: does not contain info for us (for our server id)
+ */
private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
private volatile long generationID;
private volatile int updateDoneCount = 0;
@@ -162,8 +168,7 @@
* replication server one wants to connect. Key: replication server id Value:
* replication server info for the matching replication server id
*/
- private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos
- = null;
+ private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos;
/**
* This integer defines when the best replication server checking algorithm
@@ -256,7 +261,7 @@
*
* @param replicationServers list of servers used
*/
- public void start(Collection<String> replicationServers)
+ public void start(Set<String> replicationServers)
{
synchronized (startStopLock)
{
@@ -1068,60 +1073,45 @@
{
// RS has no generation id
return ServerStatus.NORMAL_STATUS;
- } else
+ }
+ else if (rsGenId != dsGenId)
{
- if (rsGenId == dsGenId)
+ // DS and RS do not have same generation id
+ return ServerStatus.BAD_GEN_ID_STATUS;
+ }
+ else
+ {
+ /*
+ DS and RS have same generation id
+
+ Determine if we are late or not to replay changes. RS uses a
+ threshold value for pending changes to be replayed by a DS to
+ determine if the DS is in normal status or in degraded status.
+ Let's compare the local and remote server state using this threshold
+ value to determine if we are late or not
+ */
+
+ int nChanges = ServerState.diffChanges(rsState, state);
+ if (debugEnabled())
{
- /*
- DS and RS have same generation id
-
- Determine if we are late or not to replay changes. RS uses a
- threshold value for pending changes to be replayed by a DS to
- determine if the DS is in normal status or in degraded status.
- Let's compare the local and remote server state using this threshold
- value to determine if we are late or not
- */
-
- ServerStatus initStatus;
- int nChanges = ServerState.diffChanges(rsState, state);
-
- if (debugEnabled())
- {
- TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
- + serverId + " computed " + nChanges + " changes late.");
- }
-
- /*
- Check status to know if it is relevant to change the status. Do not
- take RSD lock to test. If we attempt to change the status whereas
- we are in a status that do not allows that, this will be noticed by
- the changeStatusFromStatusAnalyzer method. This allows to take the
- lock roughly only when needed versus every sleep time timeout.
- */
- if (degradedStatusThreshold > 0)
- {
- if (nChanges >= degradedStatusThreshold)
- {
- initStatus = ServerStatus.DEGRADED_STATUS;
- } else
- {
- initStatus = ServerStatus.NORMAL_STATUS;
- }
- } else
- {
- /*
- 0 threshold value means no degrading system used (no threshold):
- force normal status
- */
- initStatus = ServerStatus.NORMAL_STATUS;
- }
-
- return initStatus;
- } else
- {
- // DS and RS do not have same generation id
- return ServerStatus.BAD_GEN_ID_STATUS;
+ TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
+ + serverId + " computed " + nChanges + " changes late.");
}
+
+ /*
+ Check status to know if it is relevant to change the status. Do not
+ take RSD lock to test. If we attempt to change the status whereas
+ we are in a status that do not allows that, this will be noticed by
+ the changeStatusFromStatusAnalyzer method. This allows to take the
+ lock roughly only when needed versus every sleep time timeout.
+ */
+ if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold)
+ {
+ return ServerStatus.DEGRADED_STATUS;
+ }
+ // degradedStatusThreshold value of '0' means no degrading system used
+ // (no threshold): force normal status
+ return ServerStatus.NORMAL_STATUS;
}
}
@@ -1472,20 +1462,16 @@
return bestServers.values().iterator().next();
}
- if (firstConnection)
- {
- // We are not connected to a server yet
- return computeBestServerForWeight(bestServers, -1, -1);
- } else
- {
- /*
- We are already connected to a RS: compute the best RS as far as the
- weights is concerned. If this is another one, some DS must
- disconnect.
- */
- return computeBestServerForWeight(bestServers, rsServerId,
- localServerId);
- }
+ if (firstConnection)
+ {
+ // We are not connected to a server yet
+ return computeBestServerForWeight(bestServers, -1, -1);
+ }
+ /*
+ * We are already connected to a RS: compute the best RS as far as the
+ * weights is concerned. If this is another one, some DS must disconnect.
+ */
+ return computeBestServerForWeight(bestServers, rsServerId, localServerId);
}
/**
@@ -1783,11 +1769,12 @@
sumOfWeights += replicationServerInfo.getWeight();
sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
}
+
// Distance (difference) of the current loads to the load goals of each RS:
// key:server id, value: distance
Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
// Precision for the operations (number of digits after the dot)
- MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
+ final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
@@ -1812,199 +1799,210 @@
if (currentRsServerId == -1)
{
- // The local server is not connected yet
+ // The local server is not connected yet, find best server to connect to,
+ // taking the weights into account.
+ return computeBestServerWhenNotConnected(bestServers, loadDistances);
+ }
+ // The local server is currently connected to a RS, let's see if it must
+ // disconnect or not, taking the weights into account.
+ return computeBestServerWhenConnected(bestServers, loadDistances,
+ localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs);
+ }
+ private static ReplicationServerInfo computeBestServerWhenNotConnected(
+ Map<Integer, ReplicationServerInfo> bestServers,
+ Map<Integer, BigDecimal> loadDistances)
+ {
+ /*
+ * Find the server with the current highest distance to its load goal and
+ * choose it. Make an exception if every server is correctly balanced,
+ * that is every current load distances are equal to 0, in that case,
+ * choose the server with the highest weight
+ */
+ int bestRsId = 0; // If all server equal, return the first one
+ float highestDistance = Float.NEGATIVE_INFINITY;
+ boolean allRsWithZeroDistance = true;
+ int highestWeightRsId = -1;
+ int highestWeight = -1;
+ for (Integer rsId : bestServers.keySet())
+ {
+ float loadDistance = loadDistances.get(rsId).floatValue();
+ if (loadDistance > highestDistance)
+ {
+ // This server is far more from its balance point
+ bestRsId = rsId;
+ highestDistance = loadDistance;
+ }
+ if (loadDistance != 0)
+ {
+ allRsWithZeroDistance = false;
+ }
+ int weight = bestServers.get(rsId).getWeight();
+ if (weight > highestWeight)
+ {
+ // This server has a higher weight
+ highestWeightRsId = rsId;
+ highestWeight = weight;
+ }
+ }
+ // All servers with a 0 distance ?
+ if (allRsWithZeroDistance)
+ {
+ // Choose server with the highest weight
+ bestRsId = highestWeightRsId;
+ }
+ return bestServers.get(bestRsId);
+ }
+
+ private static ReplicationServerInfo computeBestServerWhenConnected(
+ Map<Integer, ReplicationServerInfo> bestServers,
+ Map<Integer, BigDecimal> loadDistances, int localServerId,
+ int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
+ {
+ final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
+ float currentLoadDistance =
+ loadDistances.get(currentRsServerId).floatValue();
+ if (currentLoadDistance < 0)
+ {
/*
- * Find the server with the current highest distance to its load goal and
- * choose it. Make an exception if every server is correctly balanced,
- * that is every current load distances are equal to 0, in that case,
- * choose the server with the highest weight
- */
- int bestRsId = 0; // If all server equal, return the first one
- float highestDistance = Float.NEGATIVE_INFINITY;
- boolean allRsWithZeroDistance = true;
- int highestWeightRsId = -1;
- int highestWeight = -1;
+ Too much DSs connected to the current RS, compared with its load
+ goal:
+ Determine the potential number of DSs to disconnect from the current
+ RS and see if the local DS is part of them: the DSs that must
+ disconnect are those with the lowest server id.
+ Compute the sum of the distances of the load goals of the other RSs
+ */
+ BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
for (Integer rsId : bestServers.keySet())
{
- float loadDistance = loadDistances.get(rsId).floatValue();
- if (loadDistance > highestDistance)
+ if (rsId != currentRsServerId)
{
- // This server is far more from its balance point
- bestRsId = rsId;
- highestDistance = loadDistance;
- }
- if (loadDistance != 0)
- {
- allRsWithZeroDistance = false;
- }
- int weight = bestServers.get(rsId).getWeight();
- if (weight > highestWeight)
- {
- // This server has a higher weight
- highestWeightRsId = rsId;
- highestWeight = weight;
+ sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
+ loadDistances.get(rsId), mathContext);
}
}
- // All servers with a 0 distance ?
- if (allRsWithZeroDistance)
- {
- // Choose server with the highest weight
- bestRsId = highestWeightRsId;
- }
- return bestServers.get(bestRsId);
- } else
- {
- // The local server is currently connected to a RS, let's see if it must
- // disconnect or not, taking the weights into account.
- float currentLoadDistance =
- loadDistances.get(currentRsServerId).floatValue();
- if (currentLoadDistance < 0)
+ if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
{
/*
- Too much DSs connected to the current RS, compared with its load
- goal:
- Determine the potential number of DSs to disconnect from the current
- RS and see if the local DS is part of them: the DSs that must
- disconnect are those with the lowest server id.
- Compute the sum of the distances of the load goals of the other RSs
+ The average distance of the other RSs shows a lack of DSs.
+ Compute the number of DSs to disconnect from the current RS,
+ rounding to the nearest integer number. Do only this if there is
+ no risk of yoyo effect: when the exact balance cannot be
+ established due to the current number of DSs connected, do not
+ disconnect a DS. A simple example where the balance cannot be
+ reached is:
+ - RS1 has weight 1 and 2 DSs
+ - RS2 has weight 1 and 1 DS
+ => disconnecting a DS from RS1 to reconnect it to RS2 would have no
+ sense as this would lead to the reverse situation. In that case,
+ the perfect balance cannot be reached and we must stick to the
+ current situation, otherwise the DS would keep move between the 2
+ RSs
*/
- BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
- for (Integer rsId : bestServers.keySet())
+ float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
+ multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
+ .floatValue();
+ int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
+
+ // Avoid yoyo effect
+ if (overloadingDSsNumber == 1)
{
- if (rsId != currentRsServerId)
- {
- sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
- loadDistances.get(rsId), mathContext);
- }
- }
-
- if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
- {
- /*
- The average distance of the other RSs shows a lack of DSs.
- Compute the number of DSs to disconnect from the current RS,
- rounding to the nearest integer number. Do only this if there is
- no risk of yoyo effect: when the exact balance cannot be
- established due to the current number of DSs connected, do not
- disconnect a DS. A simple example where the balance cannot be
- reached is:
- - RS1 has weight 1 and 2 DSs
- - RS2 has weight 1 and 1 DS
- => disconnecting a DS from RS1 to reconnect it to RS2 would have no
- sense as this would lead to the reverse situation. In that case,
- the perfect balance cannot be reached and we must stick to the
- current situation, otherwise the DS would keep move between the 2
- RSs
- */
- float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
- multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
- .floatValue();
- int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
-
- // Avoid yoyo effect
- if (overloadingDSsNumber == 1)
- {
- // What would be the new load distance for the current RS if
- // we disconnect some DSs ?
- ReplicationServerInfo currentReplicationServerInfo =
- bestServers.get(currentRsServerId);
-
- int currentRsWeight = currentReplicationServerInfo.getWeight();
- BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
- BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
- BigDecimal currentRsLoadGoalBd =
- currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
- BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
- if (sumOfConnectedDSs != 0)
- {
- int connectedDSs = currentReplicationServerInfo.
- getConnectedDSNumber();
- BigDecimal potentialNewConnectedDSsBd =
- BigDecimal.valueOf(connectedDSs - 1);
- BigDecimal sumOfConnectedDSsBd =
- BigDecimal.valueOf(sumOfConnectedDSs);
- potentialCurrentRsNewLoadBd =
- potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
- mathContext);
- }
- BigDecimal potentialCurrentRsNewLoadDistanceBd =
- currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
- mathContext);
-
- // What would be the new load distance for the other RSs ?
- BigDecimal additionalDsLoadBd =
- BigDecimal.ONE.divide(
- BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
- BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
- sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
- mathContext);
-
- /*
- Now compare both values: we must no disconnect the DS if this
- is for going in a situation where the load distance of the other
- RSs is the opposite of the future load distance of the local RS
- or we would evaluate that we should disconnect just after being
- arrived on the new RS. But we should disconnect if we reach the
- perfect balance (both values are 0).
- */
- MathContext roundMc =
- new MathContext(6, RoundingMode.DOWN);
- BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
- potentialCurrentRsNewLoadDistanceBd.round(roundMc);
- BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
- potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
-
- if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
- BigDecimal.ZERO) != 0)
- && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
- potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
- {
- // Avoid the yoyo effect, and keep the local DS connected to its
- // current RS
- return bestServers.get(currentRsServerId);
- }
- }
-
- // Prepare a sorted list (from lowest to highest) or DS server ids
- // connected to the current RS
- ReplicationServerInfo currentRsInfo =
+ // What would be the new load distance for the current RS if
+ // we disconnect some DSs ?
+ ReplicationServerInfo currentReplicationServerInfo =
bestServers.get(currentRsServerId);
- List<Integer> serversConnectedToCurrentRS =
- currentRsInfo.getConnectedDSs();
- List<Integer> sortedServers = new ArrayList<Integer>(
- serversConnectedToCurrentRS);
- Collections.sort(sortedServers);
- // Go through the list of DSs to disconnect and see if the local
- // server is part of them.
- int index = 0;
- while (overloadingDSsNumber > 0)
+ int currentRsWeight = currentReplicationServerInfo.getWeight();
+ BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
+ BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
+ BigDecimal currentRsLoadGoalBd =
+ currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
+ BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
+ if (sumOfConnectedDSs != 0)
{
- int severToDisconnectId = sortedServers.get(index);
- if (severToDisconnectId == localServerId)
- {
- // The local server is part of the DSs to disconnect
- return null;
- }
- overloadingDSsNumber--;
- index++;
+ int connectedDSs = currentReplicationServerInfo.
+ getConnectedDSNumber();
+ BigDecimal potentialNewConnectedDSsBd =
+ BigDecimal.valueOf(connectedDSs - 1);
+ BigDecimal sumOfConnectedDSsBd =
+ BigDecimal.valueOf(sumOfConnectedDSs);
+ potentialCurrentRsNewLoadBd =
+ potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
+ mathContext);
}
+ BigDecimal potentialCurrentRsNewLoadDistanceBd =
+ currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
+ mathContext);
- // The local server is not part of the servers to disconnect from the
- // current RS.
- } else {
- // The average distance of the other RSs does not show a lack of DSs:
- // no need to disconnect any DS from the current RS.
+ // What would be the new load distance for the other RSs ?
+ BigDecimal additionalDsLoadBd =
+ BigDecimal.ONE.divide(
+ BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
+ BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
+ sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
+ mathContext);
+
+ /*
+ Now compare both values: we must no disconnect the DS if this
+ is for going in a situation where the load distance of the other
+ RSs is the opposite of the future load distance of the local RS
+ or we would evaluate that we should disconnect just after being
+ arrived on the new RS. But we should disconnect if we reach the
+ perfect balance (both values are 0).
+ */
+ MathContext roundMc = new MathContext(6, RoundingMode.DOWN);
+ BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
+ potentialCurrentRsNewLoadDistanceBd.round(roundMc);
+ BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
+ potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
+
+ if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
+ BigDecimal.ZERO) != 0)
+ && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
+ potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
+ {
+ // Avoid the yoyo effect, and keep the local DS connected to its
+ // current RS
+ return bestServers.get(currentRsServerId);
+ }
}
+
+ // Prepare a sorted list (from lowest to highest) or DS server ids
+ // connected to the current RS
+ ReplicationServerInfo currentRsInfo =
+ bestServers.get(currentRsServerId);
+ List<Integer> serversConnectedToCurrentRS =
+ new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
+ Collections.sort(serversConnectedToCurrentRS);
+
+ // Go through the list of DSs to disconnect and see if the local
+ // server is part of them.
+ int index = 0;
+ while (overloadingDSsNumber > 0)
+ {
+ int serverIdToDisconnect = serversConnectedToCurrentRS.get(index);
+ if (serverIdToDisconnect == localServerId)
+ {
+ // The local server is part of the DSs to disconnect
+ return null;
+ }
+ overloadingDSsNumber--;
+ index++;
+ }
+
+ // The local server is not part of the servers to disconnect from the
+ // current RS.
} else {
- // The RS load goal is reached or there are not enough DSs connected to
- // it to reach it: do not disconnect from this RS and return rsInfo for
- // this RS
+ // The average distance of the other RSs does not show a lack of DSs:
+ // no need to disconnect any DS from the current RS.
}
- return bestServers.get(currentRsServerId);
+ } else {
+ // The RS load goal is reached or there are not enough DSs connected to
+ // it to reach it: do not disconnect from this RS and return rsInfo for
+ // this RS
}
+ return bestServers.get(currentRsServerId);
}
/**
@@ -2117,7 +2115,7 @@
*/
public void publish(ReplicationMsg msg)
{
- _publish(msg, false, true);
+ publish(msg, false, true);
}
/**
@@ -2128,7 +2126,7 @@
*/
public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
{
- return _publish(msg, false, retryOnFailure);
+ return publish(msg, false, retryOnFailure);
}
/**
@@ -2137,7 +2135,7 @@
*/
public void publishRecovery(ReplicationMsg msg)
{
- _publish(msg, true, true);
+ publish(msg, true, true);
}
/**
@@ -2147,7 +2145,7 @@
* @param retryOnFailure whether retry should be done on failure
* @return whether the message was successfully sent.
*/
- boolean _publish(ReplicationMsg msg, boolean recoveryMsg,
+ private boolean publish(ReplicationMsg msg, boolean recoveryMsg,
boolean retryOnFailure)
{
boolean done = false;
@@ -2175,10 +2173,6 @@
try
{
- boolean credit;
- Session current_session;
- Semaphore currentWindowSemaphore;
-
/*
save the session at the time when we acquire the
sendwindow credit so that we can make sure later
@@ -2187,9 +2181,11 @@
on a session with a credit that was acquired from a previous
session.
*/
+ Session currentSession;
+ Semaphore currentWindowSemaphore;
synchronized (connectPhaseLock)
{
- current_session = session;
+ currentSession = session;
currentWindowSemaphore = sendWindow;
}
@@ -2204,6 +2200,7 @@
return false;
}
+ boolean credit;
if (msg instanceof UpdateMsg)
{
/*
@@ -2217,6 +2214,7 @@
{
credit = true;
}
+
if (credit)
{
synchronized (connectPhaseLock)
@@ -2228,8 +2226,7 @@
reconnection happened and we need to restart from scratch.
*/
- if ((session != null) &&
- (session == current_session))
+ if (session != null && session == currentSession)
{
session.publish(msg);
done = true;
@@ -2340,7 +2337,7 @@
break;
}
- final int replicationServerID = rsServerId;
+ final int previousRsServerID = rsServerId;
try
{
ReplicationMsg msg = savedSession.receive();
@@ -2375,7 +2372,7 @@
{
// RS performs a proper disconnection
Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
- replicationServerID, savedSession.getReadableRemoteAddress(),
+ previousRsServerID, savedSession.getReadableRemoteAddress(),
serverId, baseDN.toNormalizedString());
logError(message);
@@ -2425,13 +2422,12 @@
{
// Stable topology (no topo msg since few seconds): proceed with
// best server checking.
- ReplicationServerInfo bestServerInfo =
- computeBestReplicationServer(false, rsServerId, state,
- replicationServerInfos, serverId, groupId,
- generationID);
-
- if ((rsServerId != -1) && ((bestServerInfo == null) ||
- (bestServerInfo.getServerId() != rsServerId)))
+ final ReplicationServerInfo bestServerInfo =
+ computeBestReplicationServer(false, previousRsServerID, state,
+ replicationServerInfos, serverId, groupId, generationID);
+ if (previousRsServerID != -1
+ && (bestServerInfo == null
+ || bestServerInfo.getServerId() != previousRsServerID))
{
// The best replication server is no more the one we are
// currently using. Disconnect properly then reconnect.
@@ -2439,14 +2435,14 @@
if (bestServerInfo == null)
{
message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
- serverId, replicationServerID,
+ serverId, previousRsServerID,
savedSession.getReadableRemoteAddress(),
baseDN.toNormalizedString());
}
else
{
message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
- serverId, replicationServerID,
+ serverId, previousRsServerID,
savedSession.getReadableRemoteAddress(),
bestServerInfo.getServerId(),
baseDN.toNormalizedString());
@@ -2483,7 +2479,7 @@
{
// We did not initiate the close on our side, log an error message.
Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
- serverId, baseDN.toNormalizedString(), replicationServerID,
+ serverId, baseDN.toNormalizedString(), previousRsServerID,
savedSession.getReadableRemoteAddress());
logError(message);
}
@@ -2684,9 +2680,8 @@
* requires to restart the service.
* @param groupId The new group id to use
*/
- public boolean changeConfig(
- Collection<String> replicationServers, int window, long heartbeatInterval,
- byte groupId)
+ public boolean changeConfig(Set<String> replicationServers, int window,
+ long heartbeatInterval, byte groupId)
{
// These parameters needs to be renegotiated with the ReplicationServer
// so if they have changed, that requires restarting the session with
@@ -2695,8 +2690,7 @@
// the connection is modified
boolean needToRestartSession =
this.replicationServerUrls == null
- || replicationServers.size() != this.replicationServerUrls.size()
- || !replicationServers.containsAll(this.replicationServerUrls)
+ || !replicationServers.equals(this.replicationServerUrls)
|| window != this.maxRcvWindow
|| heartbeatInterval != this.heartbeatInterval
|| groupId != this.groupId;
@@ -2788,12 +2782,10 @@
*/
public List<RSInfo> getRsList()
{
- List<RSInfo> result = new ArrayList<RSInfo>();
-
- for (ReplicationServerInfo replicationServerInfo :
- replicationServerInfos.values())
+ final List<RSInfo> result = new ArrayList<RSInfo>();
+ for (ReplicationServerInfo rsInfo : replicationServerInfos.values())
{
- result.add(replicationServerInfo.toRSInfo());
+ result.add(rsInfo.toRSInfo());
}
return result;
}
@@ -2989,14 +2981,14 @@
}
/**
- * Returns the replication monitor associated with this broker.
+ * Returns the replication monitor instance name associated with this broker.
*
- * @return The replication monitor.
+ * @return The replication monitor instance name.
*/
- ReplicationMonitor getReplicationMonitor()
+ String getReplicationMonitorInstanceName()
{
// Only invoked by replication domain so always non-null.
- return monitor;
+ return monitor.getMonitorInstanceName();
}
private void setSession(final Session newSession)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 38bd31d..5db7c3b 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -1478,7 +1478,7 @@
}
if (debugEnabled())
- TRACER.debugInfo("[IE] In " + getReplicationMonitorInstanceName()
+ TRACER.debugInfo("[IE] In " + broker.getReplicationMonitorInstanceName()
+ " export ends with " + " connected=" + broker.isConnected()
+ " exportRootException=" + exportRootException);
@@ -1585,11 +1585,6 @@
}
}
- private String getReplicationMonitorInstanceName()
- {
- return broker.getReplicationMonitor().getMonitorInstanceName();
- }
-
/**
* For all remote servers in the start list:
* - wait it has finished the import and present the expected generationID,
@@ -1835,7 +1830,8 @@
msg = broker.receive(false, false, true);
if (debugEnabled())
- TRACER.debugInfo("[IE] In " + getReplicationMonitorInstanceName()
+ TRACER.debugInfo("[IE] In "
+ + broker.getReplicationMonitorInstanceName()
+ ", receiveEntryBytes " + msg);
if (msg == null)
@@ -1888,7 +1884,7 @@
broker.publish(amsg, false);
if (debugEnabled())
TRACER.debugInfo("[IE] In "
- + getReplicationMonitorInstanceName()
+ + broker.getReplicationMonitorInstanceName()
+ ", publish InitializeRcvAckMsg" + amsg);
}
}
@@ -2072,13 +2068,12 @@
TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry="
+ Arrays.toString(lDIFEntry));
- // publish the message
boolean sent = broker.publish(entryMessage, false);
// process any publish error
- if (((!sent)||
- (broker.hasConnectionError()))||
- (broker.getNumLostConnections() != ieContext.initNumLostConnections))
+ if (!sent
+ || broker.hasConnectionError()
+ || broker.getNumLostConnections() != ieContext.initNumLostConnections)
{
// publish failed - store the error in the ieContext ...
DirectoryException de = new DirectoryException(ResultCode.OTHER,
@@ -2125,8 +2120,7 @@
* @throws DirectoryException If it was not possible to publish the
* Initialization message to the Topology.
*/
- public void initializeFromRemote(int source)
- throws DirectoryException
+ public void initializeFromRemote(int source) throws DirectoryException
{
initializeFromRemote(source, null);
}
@@ -2966,8 +2960,7 @@
* @throws ConfigException If the DirectoryServer configuration was
* incorrect.
*/
- public void startPublishService(
- Collection<String> replicationServers, int window,
+ public void startPublishService(Set<String> replicationServers, int window,
long heartbeatInterval, long changetimeHeartbeatInterval)
throws ConfigException
{
@@ -3078,18 +3071,15 @@
/**
* Change some ReplicationDomain parameters.
*
- * @param replicationServers The new list of Replication Servers that this
+ * @param replicationServers The new set of Replication Servers that this
* domain should now use.
* @param windowSize The window size that this domain should use.
* @param heartbeatInterval The heartbeatInterval that this domain should
* use.
* @param groupId The new group id to use
*/
- public void changeConfig(
- Collection<String> replicationServers,
- int windowSize,
- long heartbeatInterval,
- byte groupId)
+ public void changeConfig(Set<String> replicationServers, int windowSize,
+ long heartbeatInterval, byte groupId)
{
this.groupId = groupId;
@@ -3576,15 +3566,13 @@
Set<String> s2 = new HashSet<String>(s1);
s2.addAll(includeAttributesForDeletes);
- Set<String> s = eclIncludesByServer.get(serverId);
- if (!s1.equals(s))
+ if (!s1.equals(eclIncludesByServer.get(serverId)))
{
configurationChanged = true;
eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
}
- s = eclIncludesForDeletesByServer.get(serverId);
- if (!s2.equals(s))
+ if (!s2.equals(eclIncludesForDeletesByServer.get(serverId)))
{
configurationChanged = true;
eclIncludesForDeletesByServer.put(serverId,
@@ -3592,7 +3580,7 @@
}
// and rebuild the global list to be ready for usage
- s = new HashSet<String>();
+ Set<String> s = new HashSet<String>();
for (Set<String> attributes : eclIncludesByServer.values())
{
s.addAll(attributes);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index 0a20a5d..67ee569 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1922,4 +1922,20 @@
dsconfig("set-backend-prop", "--backend-name", backendID,
"--set", "enabled:" + enabled);
}
+
+ public static <T> Set<T> newSet(T... elems)
+ {
+ return new HashSet<T>(Arrays.asList(elems));
+ }
+
+ public static <T> SortedSet<T> newSortedSet(T... elems)
+ {
+ return new TreeSet<T>(Arrays.asList(elems));
+ }
+
+ public static <T> List<T> newList(T... elems)
+ {
+ return new ArrayList<T>(Arrays.asList(elems));
+ }
+
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 5f2b87b..6c0827f 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -236,7 +236,7 @@
private void connect(ReplicationBroker broker, int port, int timeout) throws Exception
{
- broker.start(Collections.singletonList("localhost:" + port));
+ broker.start(Collections.singleton("localhost:" + port));
// give some time to the broker to connect to the replicationServer.
checkConnection(30, broker, port);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 90e9d35..3e9fc20 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -375,8 +375,7 @@
private void createFakeReplicationDomain(boolean firstBackend,
long generationId) throws Exception
{
- List<String> replicationServers = new ArrayList<String>();
- replicationServers.add("localhost:" + replServerPort);
+ Set<String> replicationServers = newSet("localhost:" + replServerPort);
DN baseDN = DN.decode(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING);
replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 100, 1000, generationId);
@@ -566,13 +565,9 @@
private int exportedEntryCount;
private long generationID = -1;
- public FakeReplicationDomain(
- DN baseDN,
- int serverID,
- Collection<String> replicationServers,
- int window,
- long heartbeatInterval,
- long generationId) throws ConfigException
+ public FakeReplicationDomain(DN baseDN, int serverID,
+ Set<String> replicationServers, int window, long heartbeatInterval,
+ long generationId) throws ConfigException
{
super(baseDN, serverID, 100);
generationID = generationId;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
index 3a62970..48f26fd 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -29,10 +29,7 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Category;
@@ -229,9 +226,7 @@
ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true);
ReplicationBroker broker = new ReplicationBroker(null, state, EXAMPLE_DN_,
dsId, 100, generationId, 0, security, (byte) 1, 500);
- List<String> servers = new ArrayList<String>(1);
- servers.add("localhost:" + rs1Port);
- broker.start(servers);
+ broker.start(Collections.singleton("localhost:" + rs1Port));
checkConnection(30, broker, rs1Port);
return broker;
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 1a3ae3d..0984efe 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -904,16 +904,6 @@
};
}
- private <T> Set<T> newSet(T... elems)
- {
- return new HashSet<T>(Arrays.asList(elems));
- }
-
- private <T> List<T> newList(T... elems)
- {
- return Arrays.asList(elems);
- }
-
/**
* Test TopologyMsg encoding and decoding.
*/
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 6ed4021..079a806 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -300,18 +300,16 @@
(byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout,
scenario, serverState);
- List<String> replicationServers = new ArrayList<String>();
- replicationServers.add("localhost:" + rsPort);
+ Set<String> replicationServers = newSet("localhost:" + rsPort);
fakeReplicationDomain.startPublishService(replicationServers, window, 1000, 500);
if (startListen)
fakeReplicationDomain.startListenService();
// Test connection
assertTrue(fakeReplicationDomain.isConnected());
- // Check connected server port
- HostPort rd =
- HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
- assertEquals(rd.getPort(), rsPort);
+ // Check connected server port
+ HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
+ assertEquals(rd.getPort(), rsPort);
return fakeReplicationDomain;
}
@@ -1253,17 +1251,9 @@
// Fake RS 3 scenario
objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO);
- Object[][] result = new Object[objectArrayList.size()][];
- int i = 0;
- for (List<Object> objectArray : objectArrayList)
- {
- result[i] = objectArray.toArray();
- i++;
- }
-
- debugInfo("testSafeDataLevelHighProvider: number of possible parameter combinations : " + i);
-
- return result;
+ debugInfo("testSafeDataLevelHighProvider: number of possible parameter combinations : "
+ + objectArrayList.size());
+ return toDataProvider(objectArrayList);
}
/**
@@ -1862,11 +1852,16 @@
// Fake RS sends update in assured mode
objectArrayList = addPossibleParameters(objectArrayList, true, false);
- Object[][] result = new Object[objectArrayList.size()][];
+ return toDataProvider(objectArrayList);
+ }
+
+ private Object[][] toDataProvider(List<List<Object>> listOfList)
+ {
+ Object[][] result = new Object[listOfList.size()][];
int i = 0;
- for (List<Object> objectArray : objectArrayList)
+ for (List<Object> list : listOfList)
{
- result[i] = objectArray.toArray();
+ result[i] = list.toArray();
i++;
}
return result;
@@ -2292,14 +2287,7 @@
// Other additional RS scenario
objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO, DS_TIMEOUT_RS_SCENARIO_SAFE_READ, DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ, DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ);
- Object[][] result = new Object[objectArrayList.size()][];
- int i = 0;
- for (List<Object> objectArray : objectArrayList)
- {
- result[i] = objectArray.toArray();
- i++;
- }
- return result;
+ return toDataProvider(objectArrayList);
}
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index db041f4..2360491 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -616,7 +616,7 @@
// Add the root entry in the backend
backend2 = initializeTestBackend(false, backendId2);
backend2.setPrivateBackend(true);
- SortedSet<String> replServers = newSet("localhost:" + replicationServerPort);
+ SortedSet<String> replServers = newSortedSet("localhost:" + replicationServerPort);
DomainFakeCfg domainConf = new DomainFakeCfg(baseDN2, 1602, replServers);
domain2 = startNewDomain(domainConf, null,null);
@@ -2870,10 +2870,10 @@
// Add the root entry in the backend
backend2 = initializeTestBackend(false, TEST_BACKEND_ID2);
- SortedSet<String> replServers = newSet("localhost:" + replicationServerPort);
+ SortedSet<String> replServers = newSortedSet("localhost:" + replicationServerPort);
// on o=test2,sid=1702 include attrs set to : 'sn'
- SortedSet<String> eclInclude = newSet("sn", "roomnumber");
+ SortedSet<String> eclInclude = newSortedSet("sn", "roomnumber");
DomainFakeCfg domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1702, replServers);
domain2 = startNewDomain(domainConf, eclInclude, eclInclude);
@@ -2881,15 +2881,15 @@
backend3 = initializeTestBackend(false, backendId3);
// on o=test3,sid=1703 include attrs set to : 'objectclass'
- eclInclude = newSet("objectclass");
+ eclInclude = newSortedSet("objectclass");
- SortedSet<String> eclIncludeForDeletes = newSet("*");
+ SortedSet<String> eclIncludeForDeletes = newSortedSet("*");
domainConf = new DomainFakeCfg(baseDN3, 1703, replServers);
domain3 = startNewDomain(domainConf, eclInclude, eclIncludeForDeletes);
// on o=test2,sid=1704 include attrs set to : 'cn'
- eclInclude = newSet("cn");
+ eclInclude = newSortedSet("cn");
domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1704, replServers);
domain21 = startNewDomain(domainConf, eclInclude, eclInclude);
@@ -3021,11 +3021,6 @@
}
}
- private static SortedSet<String> newSet(String... values)
- {
- return new TreeSet<String>(Arrays.asList(values));
- }
-
private LDAPReplicationDomain startNewDomain(DomainFakeCfg domainConf,
SortedSet<String> eclInclude, SortedSet<String> eclIncludeForDeletes)
throws Exception
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index eb55ed1..08cc1b1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -30,7 +30,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Collection;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,12 +68,8 @@
private long generationID = 1;
- public FakeReplicationDomain(
- DN baseDN,
- int serverID,
- Collection<String> replicationServers,
- int window,
- long heartbeatInterval,
+ public FakeReplicationDomain(DN baseDN, int serverID,
+ Set<String> replicationServers, int window, long heartbeatInterval,
BlockingQueue<UpdateMsg> queue) throws ConfigException
{
super(baseDN, serverID, 100);
@@ -82,15 +78,10 @@
this.queue = queue;
}
- public FakeReplicationDomain(
- DN baseDN,
- int serverID,
- Collection<String> replicationServers,
- int window,
- long heartbeatInterval,
- String exportString,
- StringBuilder importString,
- int exportedEntryCount) throws ConfigException
+ public FakeReplicationDomain(DN baseDN, int serverID,
+ Set<String> replicationServers, int window, long heartbeatInterval,
+ String exportString, StringBuilder importString, int exportedEntryCount)
+ throws ConfigException
{
super(baseDN, serverID, 100);
startPublishService(replicationServers, window, heartbeatInterval, 500);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
index 26109f8..87148d7 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -27,12 +27,10 @@
*/
package org.opends.server.replication.service;
-import static org.opends.messages.ReplicationMessages.*;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Collection;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,6 +40,8 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
+import static org.opends.messages.ReplicationMessages.*;
+
/**
* This class is the minimum implementation of a Concrete ReplicationDomain
* used to test the Generic Replication Service.
@@ -55,12 +55,8 @@
*/
private BlockingQueue<UpdateMsg> queue = null;
- public FakeStressReplicationDomain(
- DN baseDN,
- int serverID,
- Collection<String> replicationServers,
- int window,
- long heartbeatInterval,
+ public FakeStressReplicationDomain(DN baseDN, int serverID,
+ Set<String> replicationServers, int window, long heartbeatInterval,
BlockingQueue<UpdateMsg> queue) throws ConfigException
{
super(baseDN, serverID, 100);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 5468413..3c8190c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -45,6 +45,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.opends.server.TestCaseUtils.*;
import static org.testng.Assert.*;
/**
@@ -92,16 +93,13 @@
replServer2 = createReplicationServer(replServerID2, replServerPort2,
"ReplicationDomainTestDb2", 100, "localhost:" + replServerPort1);
- List<String> servers = new ArrayList<String>(1);
- servers.add("localhost:" + replServerPort1);
+ Set<String> servers = newSet("localhost:" + replServerPort1);
BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
domain1 = new FakeReplicationDomain(
testService, domain1ServerId, servers, 100, 1000, rcvQueue1);
- List<String> servers2 = new ArrayList<String>(1);
- servers2.add("localhost:" + replServerPort2);
-
+ Set<String> servers2 = newSet("localhost:" + replServerPort2);
BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<UpdateMsg>();
domain2 = new FakeReplicationDomain(
testService, domain2ServerId, servers2, 100, 1000, rcvQueue2);
@@ -218,9 +216,7 @@
replServer1 = createReplicationServer(replServerID1, replServerPort,
"ReplicationDomainTestDb", 100000, "localhost:" + replServerPort);
- List<String> servers = new ArrayList<String>(1);
- servers.add("localhost:" + replServerPort);
-
+ Set<String> servers = newSet("localhost:" + replServerPort);
BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
domain1 = new FakeReplicationDomain(
testService, domain1ServerId, servers, 1000, 100000, rcvQueue1);
@@ -321,8 +317,7 @@
replServer = createReplicationServer(replServerID, replServerPort,
"exportAndImportData", 100);
- List<String> servers = new ArrayList<String>(1);
- servers.add("localhost:" + replServerPort);
+ Set<String> servers = newSet("localhost:" + replServerPort);
StringBuilder exportedDataBuilder = new StringBuilder();
for (int i =0; i<ENTRYCOUNT; i++)
@@ -399,11 +394,8 @@
replServer2 = createReplicationServer(replServerID2, replServerPort2,
"exportAndImportservice2", 100, "localhost:" + replServerPort1);
- List<String> servers1 = new ArrayList<String>(1);
- servers1.add("localhost:" + replServerPort1);
-
- List<String> servers2 = new ArrayList<String>(1);
- servers2.add("localhost:" + replServerPort2);
+ Set<String> servers1 = newSet("localhost:" + replServerPort1);
+ Set<String> servers2 = newSet("localhost:" + replServerPort2);
StringBuilder exportedDataBuilder = new StringBuilder();
for (int i =0; i<ENTRYCOUNT; i++)
--
Gitblit v1.10.0