From b4c27fccb2913620731a9296b04baccb69846ac7 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Tue, 19 Jan 2010 09:53:03 +0000
Subject: [PATCH] This is about refactoring the way the directory server chooses the replication server it will connect to. This also introduces a new (weighed) load balancing feature that spreads DS connections across the RSs, according to the RS weights defined by the administrator,
---
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 1620 +++++++++++++-------
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 10
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 4
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java | 8
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 22
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java | 31
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java | 4
opendj-sdk/opends/src/messages/messages/replication.properties | 5
opendj-sdk/opends/src/server/org/opends/server/replication/common/RSInfo.java | 40
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java | 1318 ++++++++++++++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java | 20
opendj-sdk/opends/src/server/org/opends/server/replication/common/DSInfo.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 29
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java | 2
opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml | 14
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 2
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java | 1494 ++++++++++++++----
21 files changed, 3,640 insertions(+), 1,002 deletions(-)
diff --git a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
index ac060ce..5b3fcd4 100644
--- a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
+++ b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -23,7 +23,7 @@
! CDDL HEADER END
!
!
- ! Copyright 2007-2009 Sun Microsystems, Inc.
+ ! Copyright 2007-2010 Sun Microsystems, Inc.
! -->
<adm:managed-object name="replication-server"
plural-name="replication-servers"
@@ -219,8 +219,8 @@
The timeout value when waiting for assured mode acknowledgments.
</adm:synopsis>
<adm:description>
- Defines the amount of milliseconds the replication server will wait for
- assured acknowledgments (in either Safe Data or Safe Read assured sub
+ Defines the number of milliseconds that the replication server will wait
+ for assured acknowledgments (in either Safe Data or Safe Read assured sub
modes) before forgetting them and answer to the entity that sent an update
and is waiting for acknowledgment.
</adm:description>
@@ -288,7 +288,7 @@
</adm:defined>
</adm:default-behavior>
<adm:syntax>
- <adm:integer lower-limit="0"></adm:integer>
+ <adm:integer lower-limit="1"></adm:integer>
</adm:syntax>
<adm:profile name="ldap">
<ldap:attribute>
@@ -301,9 +301,9 @@
The period between sending of monitoring messages.
</adm:synopsis>
<adm:description>
- Defines the amount of milliseconds the replication server will wait before
- sending new monitoring messages to its peers (replication servers and
- directory servers).
+ Defines the number of milliseconds that the replication server will wait
+ before sending new monitoring messages to its peers (replication servers
+ and directory servers).
</adm:description>
<adm:default-behavior>
<adm:defined>
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 7a9d24e..49b96c5 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -20,7 +20,7 @@
#
# CDDL HEADER END
#
-# Copyright 2006-2008 Sun Microsystems, Inc.
+# Copyright 2006-2010 Sun Microsystems, Inc.
#
#
# This file contains the primary Directory Server configuration. It must not
@@ -448,3 +448,6 @@
required. Reason: The provided cookie is older than the start of historical \
in the server for the replicated domain : %s
SEVERE_ERR_INVALID_COOKIE_SYNTAX_187=Invalid syntax of the provided cookie
+NOTICE_NEW_BEST_REPLICATION_SERVER_188=Domain %s (server id: %s) : \
+ disconnecting from this replication server (server id: %s, url: %s) : as a \
+ new one is more suitable
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/DSInfo.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/DSInfo.java
index aeb1835..fe55063 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/DSInfo.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008-2009 Sun Microsystems, Inc.
+ * Copyright 2008-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.common;
@@ -245,7 +245,7 @@
public String toString()
{
StringBuffer sb = new StringBuffer();
- sb.append("DS id: ");
+ sb.append("\nDS id: ");
sb.append(dsId);
sb.append(" ; RS id: ");
sb.append(rsId);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/RSInfo.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/RSInfo.java
index 577ffd3..1ce6d95 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/RSInfo.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/RSInfo.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008-2009 Sun Microsystems, Inc.
+ * Copyright 2008-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.common;
@@ -42,21 +42,26 @@
private byte groupId = (byte) -1;
// The weight of the RS
// It is important to keep the default value to 1 so that it is used as
- // default value for a RS using protocol V3: this default value vill be used
+ // default value for a RS using protocol V3: this default value will be used
// in algorithms that use weight
private int weight = 1;
+ // The server URL of the RS
+ private String serverUrl = null;
/**
* Creates a new instance of RSInfo with every given info.
*
* @param id The RS id
+ * @param serverUrl Url of the RS
* @param generationId The generation id the RS is using
* @param groupId RS group id
* @param weight RS weight
*/
- public RSInfo(int id, long generationId, byte groupId, int weight)
+ public RSInfo(int id, String serverUrl,
+ long generationId, byte groupId, int weight)
{
this.id = id;
+ this.serverUrl = serverUrl;
this.generationId = generationId;
this.groupId = groupId;
this.weight = weight;
@@ -117,7 +122,10 @@
return ((id == rsInfo.getId()) &&
(generationId == rsInfo.getGenerationId()) &&
(groupId == rsInfo.getGroupId()) &&
- (weight == rsInfo.getWeight()));
+ (weight == rsInfo.getWeight()) &&
+ (((serverUrl == null) && (rsInfo.getServerUrl() == null)) ||
+ ((serverUrl != null) && (rsInfo.getServerUrl() != null) &&
+ (serverUrl.equals(rsInfo.getServerUrl())))));
} else
{
return false;
@@ -131,15 +139,25 @@
@Override
public int hashCode()
{
- int hash = 5;
- hash = 37 * hash + this.id;
- hash = 37 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
- hash = 37 * hash + this.groupId;
- hash = 37 * hash + this.weight;
+ int hash = 7;
+ hash = 17 * hash + this.id;
+ hash = 17 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
+ hash = 17 * hash + this.groupId;
+ hash = 17 * hash + this.weight;
+ hash = 17 * hash + (this.serverUrl != null ? this.serverUrl.hashCode() : 0);
return hash;
}
/**
+ * Gets the server URL.
+ * @return the serverUrl
+ */
+ public String getServerUrl()
+ {
+ return serverUrl;
+ }
+
+ /**
* Returns a string representation of the DS info.
* @return A string representation of the DS info
*/
@@ -147,8 +165,10 @@
public String toString()
{
StringBuffer sb = new StringBuffer();
- sb.append("Id: ");
+ sb.append("\nId: ");
sb.append(id);
+ sb.append(" ; Server URL: ");
+ sb.append(serverUrl);
sb.append(" ; Generation id: ");
sb.append(generationId);
sb.append(" ; Group id: ");
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index 61df06a..f487638 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.common;
@@ -126,7 +126,7 @@
* of a server state.
*
* @param in the byte array where to calculate the string.
- * @param pos the position whre to start from in the byte array.
+ * @param pos the position where to start from in the byte array.
* @return the length of the next string.
* @throws DataFormatException If the byte array does not end with null.
*/
@@ -174,6 +174,33 @@
}
/**
+ * Update the Server State with a Server State. Every change number of this
+ * object is updated with the change number of the passed server state if
+ * it is newer.
+ *
+ * @param serverState the server state to use for the update.
+ *
+ * @return a boolean indicating if the update was meaningful.
+ */
+ public boolean update(ServerState serverState)
+ {
+ if (serverState == null)
+ return false;
+
+ boolean updated = false;
+
+ for (ChangeNumber cn : serverState.list.values())
+ {
+ if (update(cn))
+ {
+ updated = true;
+ }
+ }
+
+ return updated;
+ }
+
+ /**
* Replace the Server State with another ServerState.
*
* @param serverState The ServerState.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index b2f095c..21b0714 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -59,6 +59,7 @@
* ECL entry attributes.
* - Modified algorithm for choosing a RS to connect to: introduction of a
* ReplicationServerDSMsg message.
+ * -> also added of the server URL in RSInfo of TopologyMsg
* - Introduction of a StopMsg for proper connections ending.
*/
public static final short REPLICATION_PROTOCOL_V4 = 4;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
index fc735ec..dc572e3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2009 Sun Microsystems, Inc.
+ * Copyright 2009-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -80,7 +80,7 @@
* This specifies that the request on the ECL is a PERSISTENT search
* with changesOnly = false.
*/
- public final static short PERSISTENT_CHANGES_ONLY = 2;;
+ public final static short PERSISTENT_CHANGES_ONLY = 2;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index 85a4d1d..c2c777b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2007-2009 Sun Microsystems, Inc.
+ * Copyright 2007-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -198,6 +198,10 @@
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
+ // Put server URL
+ oStream.write(rsInfo.getServerUrl().getBytes("UTF-8"));
+ oStream.write(0);
+
// Put RS weight
oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
oStream.write(0);
@@ -242,8 +246,7 @@
int length = getNextLength(in, pos);
String serverIdString = new String(in, pos, length, "UTF-8");
int dsId = Integer.valueOf(serverIdString);
- pos +=
- length + 1;
+ pos += length + 1;
/* Read RS id */
length =
@@ -251,16 +254,14 @@
serverIdString =
new String(in, pos, length, "UTF-8");
int rsId = Integer.valueOf(serverIdString);
- pos +=
- length + 1;
+ pos += length + 1;
/* Read the generation id */
length = getNextLength(in, pos);
long generationId =
Long.valueOf(new String(in, pos, length,
"UTF-8"));
- pos +=
- length + 1;
+ pos += length + 1;
/* Read DS status */
ServerStatus status = ServerStatus.valueOf(in[pos++]);
@@ -296,8 +297,7 @@
length = getNextLength(in, pos);
String url = new String(in, pos, length, "UTF-8");
refUrls.add(url);
- pos +=
- length + 1;
+ pos += length + 1;
nRead++;
}
@@ -314,8 +314,7 @@
length = getNextLength(in, pos);
String attr = new String(in, pos, length, "UTF-8");
attrs.add(attr);
- pos +=
- length + 1;
+ pos += length + 1;
nRead++;
}
}
@@ -353,8 +352,13 @@
byte groupId = in[pos++];
int weight = 1;
+ String serverUrl = null;
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
+ length = getNextLength(in, pos);
+ serverUrl = new String(in, pos, length, "UTF-8");
+ pos += length + 1;
+
/* Read RS weight */
length = getNextLength(in, pos);
weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
@@ -363,7 +367,8 @@
/* Now create RSInfo and store it in list */
- RSInfo rsInfo = new RSInfo(id, generationId, groupId, weight);
+ RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
+ weight);
rsList.add(rsInfo);
nRsInfo--;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 056667d..954da42 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
@@ -265,6 +265,7 @@
throw new ConfigException(msg, e);
}
groupId = (byte)configuration.getGroupId();
+ weight = configuration.getWeight();
assuredTimeout = configuration.getAssuredTimeout();
degradedStatusThreshold = configuration.getDegradedStatusThreshold();
monitoringPublisherPeriod = configuration.getMonitoringPeriod();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 118d73d..67673d1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -107,7 +107,7 @@
public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
{
private final String baseDn;
- // The Status analyzer that periodically verifis if the connected DSs are
+ // The Status analyzer that periodically verifies if the connected DSs are
// late or not
private StatusAnalyzer statusAnalyzer = null;
@@ -744,7 +744,7 @@
// Change the number of expected acks if not enough available eligible
// servers: the level is a best effort thing, we do not want to timeout
// at every assured SD update for instance if a RS has had his gen id
- // resetted
+ // reseted
byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ?
(byte)sdl : // Keep level as it was
(byte)(nExpectedServers+1)); // Change level to match what's available
@@ -823,7 +823,7 @@
}
} else
{
- // The timeout occured for the update matching this change number and the
+ // The timeout occurred for the update matching this change number and the
// ack with timeout error has probably already been sent.
}
}
@@ -2026,8 +2026,8 @@
// Create info for the local RS
List<RSInfo> rsInfos = new ArrayList<RSInfo>();
RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
- generationId, replicationServer.getGroupId(),
- replicationServer.getWeight());
+ replicationServer.getServerURL(), generationId,
+ replicationServer.getGroupId(), replicationServer.getWeight());
rsInfos.add(localRSInfo);
return new TopologyMsg(dsInfos, rsInfos);
@@ -2040,7 +2040,7 @@
* Also put info related to local RS.
*
* @param destDsId The id of the DS the TopologyMsg PDU is to be sent to and
- * that we must not include in the list DS list.
+ * that we must not include in the DS list.
* @return A suitable TopologyMsg PDU to be sent to a peer DS
*/
public TopologyMsg createTopologyMsgForDS(int destDsId)
@@ -2058,8 +2058,8 @@
// Add our own info (local RS)
RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
- generationId, replicationServer.getGroupId(),
- replicationServer.getWeight());
+ replicationServer.getServerURL(), generationId,
+ replicationServer.getGroupId(), replicationServer.getWeight());
rsInfos.add(localRSInfo);
// Go through every peer RSs (and get their connected DSs), also add info
@@ -2502,7 +2502,7 @@
*/
if (allowResetGenId)
{
- // Check if generation id has to be resetted
+ // Check if generation id has to be reseted
mayResetGenerationId();
if (generationId < 0)
generationId = handler.getGenerationId();
@@ -3214,7 +3214,7 @@
/**
* Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat
* state.
- * For each DS, take the oldest CN from the changetime hearbeat state
+ * For each DS, take the oldest CN from the changetime heartbeat state
* and from the changelog db last CN. Can be null.
* @return the eligible CN.
*/
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a0ebd0c..3282d50 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -1219,7 +1219,8 @@
*/
public RSInfo toRSInfo()
{
- RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, weight);
+ RSInfo rsInfo = new RSInfo(serverId, serverURL, generationId, groupId,
+ weight);
return rsInfo;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index bd145d6..baa4310 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -26,12 +26,16 @@
*/
package org.opends.server.replication.service;
+import java.net.UnknownHostException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -40,10 +44,12 @@
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -51,7 +57,6 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
-import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
@@ -102,7 +107,7 @@
private Semaphore sendWindow;
private int maxSendWindow;
private int rcvWindow = 100;
- private int halfRcvWindow = rcvWindow/2;
+ private int halfRcvWindow = rcvWindow / 2;
private int maxRcvWindow = rcvWindow;
private int timeout = 0;
private short protocolVersion;
@@ -117,13 +122,11 @@
private String rsServerUrl = null;
// Our replication domain
private ReplicationDomain domain = null;
-
/**
* This object is used as a conditional event to be notified about
* the reception of monitor information from the Replication Server.
*/
private final MutableBoolean monitorResponse = new MutableBoolean(false);
-
/**
* A Map containing the ServerStates of all the replicas in the topology
* as seen by the ReplicationServer the last time it was polled or the last
@@ -131,15 +134,6 @@
*/
private HashMap<Integer, ServerState> replicaStates =
new HashMap<Integer, ServerState>();
-
- /**
- * A Map containing the ServerStates of all the replication servers in the
- * topology as seen by the ReplicationServer the last time it was polled or
- * the last time it published monitoring information.
- */
- private HashMap<Integer, ServerState> rsStates =
- new HashMap<Integer, ServerState>();
-
/**
* The expected duration in milliseconds between heartbeats received
* from the replication server. Zero means heartbeats are off.
@@ -163,10 +157,6 @@
*/
private boolean connectionError = false;
private final Object connectPhaseLock = new Object();
-
- // Same group id poller thread
- private SameGroupIdPoller sameGroupIdPoller = null;
-
/**
* The thread that publishes messages to the RS containing the current
* change time of this DS.
@@ -174,7 +164,7 @@
private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
/**
* The expected period in milliseconds between these messages are sent
- * to the replication server. Zero means heartbeats are off.
+ * to the replication server. Zero means heartbeats are off.
*/
private long changeTimeHeartbeatSendInterval = 0;
/*
@@ -183,12 +173,30 @@
// Info for other DSs.
// Warning: does not contain info for us (for our server id)
private List<DSInfo> dsList = new ArrayList<DSInfo>();
- // Info for other RSs.
- private List<RSInfo> rsList = new ArrayList<RSInfo>();
-
private long generationID;
private int updateDoneCount = 0;
private boolean connectRequiresRecovery = false;
+ /**
+ * The map of replication server info initialized at connection time and
+ * regularly updated. This is used to decide to which best suitable
+ * replication server one wants to connect.
+ * Key: replication server id
+ * Value: replication server info for the matching replication server id
+ */
+ private Map<Integer, ReplicationServerInfo> replicationServerInfos = null;
+ /**
+ * This integer defines when the best replication server checking algorithm
+ * should be engaged.
+ * Every time a monitoring message (each monitoring publisher period) is
+ * received, it is incremented. When it reaches 2, we run the checking
+ * algorithm to see if we must reconnect to another best replication server.
+ * Then we reset the value to 0. But when a topology message is received, the
+ * integer is reseted to 0. This insures that we wait at least one monitoring
+ * publisher period before running the algorithm, but also that we wait at
+ * least for a monitoring period after the last received topology message
+ * (topology stabilization).
+ */
+ private int mustRunBestServerCheckingAlgorithm = 0;
/**
* Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -228,7 +236,7 @@
this.heartbeatInterval = heartbeatInterval;
this.maxRcvWindow = window;
this.maxRcvWindow = window;
- this.halfRcvWindow = window /2;
+ this.halfRcvWindow = window / 2;
this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
}
@@ -284,7 +292,7 @@
return rsServerId;
}
- /**
+ /**
* Gets the server id.
* @return The server id
*/
@@ -300,9 +308,11 @@
private long getGenerationID()
{
if (domain != null)
- return domain.getGenerationID();
- else
- return generationID;
+ {
+ // Update the generation id
+ generationID = domain.getGenerationID();
+ }
+ return generationID;
}
/**
@@ -315,48 +325,167 @@
}
/**
- * Bag class for keeping info we get from a server in order to compute the
- * best one to connect to. This is in fact a wrapper to a
- * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
+ * Sets the locally configured flag for the passed ReplicationServerInfo
+ * object, analyzing the local configuration.
+ * @param
*/
- public static class ServerInfo
+ private void updateRSInfoLocallyConfiguredStatus(
+ ReplicationServerInfo replicationServerInfo)
+ {
+ // Determine if the passed ReplicationServerInfo has a URL that is present
+ // in the locally configured replication servers
+ String rsUrl = replicationServerInfo.getServerURL();
+ if (rsUrl == null)
+ {
+ // The ReplicationServerInfo has been generated from a server with
+ // no URL in TopologyMsg (i.e: with replication protocol version < 4):
+ // ignore this server as we do not know how to connect to it
+ replicationServerInfo.setLocallyConfigured(false);
+ return;
+ }
+ for (String serverUrl : servers)
+ {
+ if (isSameReplicationServerUrl(serverUrl, rsUrl))
+ {
+ // This RS is locally configured, mark this
+ replicationServerInfo.setLocallyConfigured(true);
+ return;
+ }
+ }
+ replicationServerInfo.setLocallyConfigured(false);
+ }
+
+ /**
+ * Compares 2 replication servers addresses and returns true if they both
+ * represent the same replication server instance.
+ * @param rs1Url Replication server 1 address
+ * @param rs2Url Replication server 2 address
+ * @return True if both replication server addresses represent the same
+ * replication server instance, false otherwise.
+ */
+ private static boolean isSameReplicationServerUrl(String rs1Url,
+ String rs2Url)
+ {
+ // Get and compare ports of RS1 and RS2
+ int separator1 = rs1Url.lastIndexOf(':');
+ if (separator1 < 0)
+ {
+ // Not a RS url: should not happen
+ return false;
+ }
+ int rs1Port = Integer.parseInt(rs1Url.substring(separator1 + 1));
+
+ int separator2 = rs2Url.lastIndexOf(':');
+ if (separator2 < 0)
+ {
+ // Not a RS url: should not happen
+ return false;
+ }
+ int rs2Port = Integer.parseInt(rs2Url.substring(separator2 + 1));
+
+ if (rs1Port != rs2Port)
+ {
+ return false;
+ }
+
+ // Get and compare addresses of RS1 and RS2
+ String rs1 = rs1Url.substring(0, separator1);
+ InetAddress[] rs1Addresses = null;
+ try
+ {
+ if (rs1.equals("localhost") || rs1.equals("127.0.0.1"))
+ {
+ // Replace localhost with the local official hostname
+ rs1 = InetAddress.getLocalHost().getHostName();
+ }
+ rs1Addresses = InetAddress.getAllByName(rs1);
+ } catch (UnknownHostException ex)
+ {
+ // Unknown RS: should not happen
+ return false;
+ }
+
+ String rs2 = rs2Url.substring(0, separator2);
+ InetAddress[] rs2Addresses = null;
+ try
+ {
+ if (rs2.equals("localhost") || rs2.equals("127.0.0.1"))
+ {
+ // Replace localhost with the local official hostname
+ rs2 = InetAddress.getLocalHost().getHostName();
+ }
+ rs2Addresses = InetAddress.getAllByName(rs2);
+ } catch (UnknownHostException ex)
+ {
+ // Unknown RS: should not happen
+ return false;
+ }
+
+ // Now compare addresses, if at least one match, this is the same server
+ for (int i = 0; i < rs1Addresses.length; i++)
+ {
+ InetAddress inetAddress1 = rs1Addresses[i];
+ for (int j = 0; j < rs2Addresses.length; j++)
+ {
+ InetAddress inetAddress2 = rs2Addresses[j];
+ if (inetAddress2.equals(inetAddress1))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Bag class for keeping info we get from a replication server in order to
+ * compute the best one to connect to. This is in fact a wrapper to a
+ * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be
+ * updated with a info coming from received topology messages or monitoring
+ * messages.
+ */
+ public static class ReplicationServerInfo
{
private short protocolVersion;
private long generationId;
private byte groupId = (byte) -1;
private int serverId;
+ // Received server URL
private String serverURL;
private String baseDn = null;
private int windowSize;
- private ServerState serverState;
+ private ServerState serverState = null;
private boolean sslEncryption;
private int degradedStatusThreshold = -1;
- // Keeps the -1 value if created with a ReplServerStartMsg
- private int weight = -1;
- // Keeps the -1 value if created with a ReplServerStartMsg
- private int connectedDSNumber = -1;
+ // Keeps the 1 value if created with a ReplServerStartMsg
+ private int weight = 1;
+ // Keeps the 0 value if created with a ReplServerStartMsg
+ private int connectedDSNumber = 0;
+ private List<Integer> connectedDSs = null;
+ // Is this RS locally configured ? (the RS is recognized as a usable server)
+ private boolean locallyConfigured = true;
/**
- * Create a new instance of ServerInfo wrapping the passed message.
+ * Create a new instance of ReplicationServerInfo wrapping the passed
+ * message.
* @param msg Message to wrap.
* @return The new instance wrapping the passed message.
* @throws IllegalArgumentException If the passed message has an unexpected
* type.
*/
- public static ServerInfo newServerInfo(
+ public static ReplicationServerInfo newInstance(
ReplicationMsg msg) throws IllegalArgumentException
{
if (msg instanceof ReplServerStartMsg)
{
// This is a ReplServerStartMsg (RS uses protocol V3 or under)
- ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
- return new ServerInfo(replServerStartMsg);
- }
- else if (msg instanceof ReplServerStartDSMsg)
+ ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg;
+ return new ReplicationServerInfo(replServerStartMsg);
+ } else if (msg instanceof ReplServerStartDSMsg)
{
// This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
- ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
- return new ServerInfo(replServerStartDSMsg);
+ ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg;
+ return new ReplicationServerInfo(replServerStartDSMsg);
}
// Unsupported message type: should not happen
@@ -365,10 +494,10 @@
}
/**
- * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
+ * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg.
* @param replServerStartMsg The ReplServerStartMsg this object will wrap.
*/
- private ServerInfo(ReplServerStartMsg replServerStartMsg)
+ private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
{
this.protocolVersion = replServerStartMsg.getVersion();
this.generationId = replServerStartMsg.getGenerationId();
@@ -384,11 +513,12 @@
}
/**
- * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
+ * Constructs a ReplicationServerInfo object wrapping a
+ * ReplServerStartDSMsg.
* @param replServerStartDSMsg The ReplServerStartDSMsg this object will
* wrap.
*/
- private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
+ private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
{
this.protocolVersion = replServerStartDSMsg.getVersion();
this.generationId = replServerStartDSMsg.getGenerationId();
@@ -514,16 +644,98 @@
{
return connectedDSNumber;
}
+
+ /**
+ * Constructs a new replication server info with the passed RSInfo
+ * internal values and the passed connected DSs.
+ * @param rsInfo The RSinfo to use for the update
+ * @param connectedDSs The new connected DSs
+ */
+ public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs)
+ {
+ this.serverId = rsInfo.getId();
+ this.serverURL = rsInfo.getServerUrl();
+ this.generationId = rsInfo.getGenerationId();
+ this.groupId = rsInfo.getGroupId();
+ this.weight = rsInfo.getWeight();
+ this.connectedDSs = connectedDSs;
+ this.connectedDSNumber = connectedDSs.size();
+ }
+
+ /**
+ * Converts the object to a RSInfo object.
+ * @return The RSInfo object matching this object.
+ */
+ public RSInfo toRSInfo()
+ {
+ return new RSInfo(serverId, serverURL, generationId, groupId, weight);
+ }
+
+ /**
+ * Updates replication server info with the passed RSInfo internal values
+ * and the passed connected DSs.
+ * @param rsInfo The RSinfo to use for the update
+ * @param connectedDSs The new connected DSs
+ */
+ public void update(RSInfo rsInfo, List<Integer> connectedDSs)
+ {
+ this.generationId = rsInfo.getGenerationId();
+ this.groupId = rsInfo.getGroupId();
+ this.weight = rsInfo.getWeight();
+ this.connectedDSs = connectedDSs;
+ this.connectedDSNumber = connectedDSs.size();
+ }
+
+ /**
+ * Updates replication server info with the passed server state.
+ * @param serverState The ServerState to use for the update
+ */
+ public void update(ServerState serverState)
+ {
+ if (this.serverState != null)
+ {
+ this.serverState.update(serverState);
+ } else
+ {
+ this.serverState = serverState;
+ }
+ }
+
+ /**
+ * Get the getConnectedDSs.
+ * @return the getConnectedDSs
+ */
+ public List<Integer> getConnectedDSs()
+ {
+ return connectedDSs;
+ }
+
+ /**
+ * Gets the locally configured status for this RS.
+ * @return the locallyConfigured
+ */
+ public boolean isLocallyConfigured()
+ {
+ return locallyConfigured;
+ }
+
+ /**
+ * Sets the locally configured status for this RS.
+ * @param locallyConfigured the locallyConfigured to set
+ */
+ public void setLocallyConfigured(boolean locallyConfigured)
+ {
+ this.locallyConfigured = locallyConfigured;
+ }
}
private void connect()
{
if (this.baseDn.compareToIgnoreCase(
- ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0)
+ ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0)
{
connectAsECL();
- }
- else
+ } else
{
connectAsDataServer();
}
@@ -534,19 +746,22 @@
* able to choose the more suitable.
* @return the collected information.
*/
- private Map<String, ServerInfo> collectReplicationServersInfo() {
+ private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
+ {
- Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ Map<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
for (String server : servers)
{
// Connect to server and get info about it
- ServerInfo serverInfo = performPhaseOneHandshake(server, false);
+ ReplicationServerInfo replicationServerInfo =
+ performPhaseOneHandshake(server, false);
// Store server info in list
- if (serverInfo != null)
+ if (replicationServerInfo != null)
{
- rsInfos.put(server, serverInfo);
+ rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo);
}
}
@@ -558,7 +773,6 @@
* are :
* - 1 single RS configured
* - so no choice of the preferred RS
- * - No same groupID polling
* - ?? Heartbeat
* - Start handshake is :
* Broker ---> StartECLMsg ---> RS
@@ -570,10 +784,10 @@
// FIXME:ECL List of RS to connect is for now limited to one RS only
String bestServer = this.servers.iterator().next();
- ReplServerStartDSMsg inReplServerStartDSMsg
- = performECLPhaseOneHandshake(bestServer, true);
+ ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake(
+ bestServer, true);
- if (inReplServerStartDSMsg!=null)
+ if (inReplServerStartDSMsg != null)
performECLPhaseTwoHandshake(bestServer);
}
@@ -589,7 +803,7 @@
*
* phase 1:
* DS --- ServerStartMsg ---> RS
- * DS <--- ReplServerStartMsg --- RS
+ * DS <--- ReplServerStartDSMsg --- RS
* phase 2:
* DS --- StartSessionMsg ---> RS
* DS <--- TopologyMsg --- RS
@@ -615,9 +829,9 @@
}
// Stop any existing poller and heartbeat monitor from a previous session.
- stopSameGroupIdPoller();
stopRSHeartBeatMonitoring();
stopChangeTimeHeartBeatPublishing();
+ mustRunBestServerCheckingAlgorithm = 0;
boolean newServerWithSameGroupId = false;
synchronized (connectPhaseLock)
@@ -628,41 +842,47 @@
*/
if (debugEnabled())
TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
- " order to elect the preferred one");
+ " order to elect the preferred one");
// Get info from every available replication servers
- Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
+ replicationServerInfos = collectReplicationServersInfo();
- ServerInfo serverInfo = null;
+ ReplicationServerInfo replicationServerInfo = null;
- if (rsInfos.size() > 0)
+ if (replicationServerInfos.size() > 0)
{
// At least one server answered, find the best one.
- String bestServer = computeBestReplicationServer(state, rsInfos,
- serverId, baseDn, groupId);
+ replicationServerInfo = computeBestReplicationServer(true, -1, state,
+ replicationServerInfos, serverId, baseDn, groupId,
+ this.getGenerationID());
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
TRACER.debugInfo(
- "phase 2 : will perform PhaseOneH with the preferred RS.");
- serverInfo = performPhaseOneHandshake(bestServer, true);
+ "phase 2 : will perform PhaseOneH with the preferred RS.");
+ replicationServerInfo = performPhaseOneHandshake(
+ replicationServerInfo.getServerURL(), true);
+ // Update replication server info with potentially more up to date data
+ // (server state for instance may have changed)
+ replicationServerInfos.put(replicationServerInfo.getServerId(),
+ replicationServerInfo);
- if (serverInfo != null) // Handshake phase 1 exchange went well
-
+ if (replicationServerInfo != null)
{
+ // Handshake phase 1 exchange went well
+
// Compute in which status we are starting the session to tell the RS
ServerStatus initStatus =
- computeInitialServerStatus(serverInfo.getGenerationId(),
- serverInfo.getServerState(),
- serverInfo.getDegradedStatusThreshold(),
+ computeInitialServerStatus(replicationServerInfo.getGenerationId(),
+ replicationServerInfo.getServerState(),
+ replicationServerInfo.getDegradedStatusThreshold(),
this.getGenerationID());
- // Perfom session start (handshake phase 2)
- TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer,
- initStatus);
+ // Perform session start (handshake phase 2)
+ TopologyMsg topologyMsg = performPhaseTwoHandshake(
+ replicationServerInfo.getServerURL(), initStatus);
if (topologyMsg != null) // Handshake phase 2 exchange went well
-
{
try
{
@@ -681,7 +901,7 @@
* reconnection at that time to retrieve a server with our group
* id.
*/
- byte tmpRsGroupId = serverInfo.getGroupId();
+ byte tmpRsGroupId = replicationServerInfo.getGroupId();
boolean someServersWithSameGroupId =
hasSomeServerWithSameGroupId(topologyMsg.getRsList());
@@ -690,10 +910,10 @@
((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
{
replicationServer = session.getReadableRemoteAddress();
- maxSendWindow = serverInfo.getWindowSize();
- rsGroupId = serverInfo.getGroupId();
- rsServerId = serverInfo.getServerId();
- rsServerUrl = bestServer;
+ maxSendWindow = replicationServerInfo.getWindowSize();
+ rsGroupId = replicationServerInfo.getGroupId();
+ rsServerId = replicationServerInfo.getServerId();
+ rsServerUrl = replicationServerInfo.getServerURL();
receiveTopo(topologyMsg);
@@ -715,27 +935,27 @@
if (domain != null)
{
domain.sessionInitiated(
- initStatus, serverInfo.getServerState(),
- serverInfo.getGenerationId(),
- session);
+ initStatus, replicationServerInfo.getServerState(),
+ replicationServerInfo.getGenerationId(),
+ session);
}
if (getRsGroupId() != groupId)
{
- // Connected to replication server with wrong group id:
- // warn user and start poller to recover when a server with
- // right group id arrives...
- Message message =
- WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
- Byte.toString(groupId), Integer.toString(rsServerId),
- bestServer, Byte.toString(getRsGroupId()),
- baseDn.toString(), Integer.toString(serverId));
- logError(message);
- startSameGroupIdPoller();
+ // Connected to replication server with wrong group id:
+ // warn user and start poller to recover when a server with
+ // right group id arrives...
+ Message message =
+ WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
+ Byte.toString(groupId), Integer.toString(rsServerId),
+ replicationServerInfo.getServerURL(),
+ Byte.toString(getRsGroupId()),
+ baseDn.toString(), Integer.toString(serverId));
+ logError(message);
}
startRSHeartBeatMonitoring();
- if (serverInfo.getProtocolVersion()
- >= ProtocolVersion.REPLICATION_PROTOCOL_V3)
+ if (replicationServerInfo.getProtocolVersion() >=
+ ProtocolVersion.REPLICATION_PROTOCOL_V3)
{
startChangeTimeHeartBeatPublishing();
}
@@ -753,7 +973,7 @@
} catch (Exception e)
{
Message message = ERR_COMPUTING_FAKE_OPS.get(
- baseDn, bestServer,
+ baseDn, replicationServerInfo.getServerURL(),
e.getLocalizedMessage() + stackTraceToSingleLineString(e));
logError(message);
} finally
@@ -783,8 +1003,9 @@
{
connectPhaseLock.notify();
- if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
- (serverInfo.getGenerationId() == -1))
+ if ((replicationServerInfo.getGenerationId() ==
+ this.getGenerationID()) ||
+ (replicationServerInfo.getGenerationId() == -1))
{
Message message =
NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -801,7 +1022,7 @@
baseDn.toString(),
replicationServer,
Long.toString(this.getGenerationID()),
- Long.toString(serverInfo.getGenerationId()));
+ Long.toString(replicationServerInfo.getGenerationId()));
logError(message);
}
} else
@@ -908,7 +1129,7 @@
/**
* Connect to the provided server performing the first phase handshake
* (start messages exchange) and return the reply message from the replication
- * server, wrapped in a ServerInfo object.
+ * server, wrapped in a ReplicationServerInfo object.
*
* @param server Server to connect to.
* @param keepConnection Do we keep session opened or not after handshake.
@@ -917,10 +1138,10 @@
* @return The answer from the server . Null if could not
* get an answer.
*/
- private ServerInfo performPhaseOneHandshake(String server,
+ private ReplicationServerInfo performPhaseOneHandshake(String server,
boolean keepConnection)
{
- ServerInfo serverInfo = null;
+ ReplicationServerInfo replServerInfo = null;
// Parse server string.
int separator = server.lastIndexOf(':');
@@ -962,17 +1183,17 @@
ReplicationMsg msg = localSession.receive();
if (debugEnabled())
- {
- TRACER.debugInfo("In RB for " + baseDn +
- "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
- "\nAND RECEIVED:\n" + msg.toString());
- }
+ {
+ TRACER.debugInfo("In RB for " + baseDn +
+ "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
+ "\nAND RECEIVED:\n" + msg.toString());
+ }
// Wrap received message in a server info object
- serverInfo = ServerInfo.newServerInfo(msg);
+ replServerInfo = ReplicationServerInfo.newInstance(msg);
// Sanity check
- String repDn = serverInfo.getBaseDn();
+ String repDn = replServerInfo.getBaseDn();
if (!(this.baseDn.equals(repDn)))
{
Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -987,7 +1208,7 @@
* if it is an old replication server).
*/
protocolVersion = ProtocolVersion.minWithCurrent(
- serverInfo.getProtocolVersion());
+ replServerInfo.getProtocolVersion());
localSession.setProtocolVersion(protocolVersion);
@@ -1017,7 +1238,7 @@
error = true;
} catch (Exception e)
{
- if ( (e instanceof SocketTimeoutException) && debugEnabled() )
+ if ((e instanceof SocketTimeoutException) && debugEnabled())
{
TRACER.debugInfo("Timeout trying to connect to RS " + server +
" for dn: " + baseDn);
@@ -1068,7 +1289,7 @@
}
if (error)
{
- serverInfo = null;
+ replServerInfo = null;
} // Be sure to return null.
}
@@ -1080,7 +1301,7 @@
session = localSession;
}
- return serverInfo;
+ return replServerInfo;
}
/**
@@ -1126,11 +1347,11 @@
// Send our start msg.
ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg(
- baseDn, 0, 0, 0, 0,
- maxRcvWindow, heartbeatInterval, state,
- ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
- isSslEncryption,
- groupId);
+ baseDn, 0, 0, 0, 0,
+ maxRcvWindow, heartbeatInterval, state,
+ ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
+ isSslEncryption,
+ groupId);
localSession.publish(serverStartECLMsg);
// Read the ReplServerStartMsg that should come back.
@@ -1189,7 +1410,7 @@
error = true;
} catch (Exception e)
{
- if ( (e instanceof SocketTimeoutException) && debugEnabled() )
+ if ((e instanceof SocketTimeoutException) && debugEnabled())
{
TRACER.debugInfo("Timeout trying to connect to RS " + server +
" for dn: " + baseDn);
@@ -1282,7 +1503,7 @@
{
TRACER.debugInfo("In RB for " + baseDn +
"\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString());
- // + "\nAND RECEIVED:\n" + topologyMsg.toString());
+ // + "\nAND RECEIVED:\n" + topologyMsg.toString());
}
// Alright set the timeout to the desired value
@@ -1340,13 +1561,13 @@
{
startSessionMsg =
new StartSessionMsg(
- initStatus,
- domain.getRefUrls(),
- domain.isAssured(),
- domain.getAssuredMode(),
- domain.getAssuredSdLevel());
+ initStatus,
+ domain.getRefUrls(),
+ domain.isAssured(),
+ domain.getAssuredMode(),
+ domain.getAssuredSdLevel());
startSessionMsg.setEclIncludes(
- domain.getEclInclude());
+ domain.getEclInclude());
} else
{
startSessionMsg =
@@ -1395,269 +1616,560 @@
/**
* Returns the replication server that best fits our need so that we can
- * connect to it.
- * This methods performs some filtering on the group id, then call
- * the real search for best server algorithm (searchForBestReplicationServer).
+ * connect to it or determine if we must disconnect from current one to
+ * re-connect to best server.
*
- * Note: this method put as public static for unit testing purpose.
+ * Note: this method is static for test purpose (access from unit tests)
*
+ * @param firstConnection True if we run this method for the very first
+ * connection of the broker. False if we run this method to determine if the
+ * replication server we are currently connected to is still the best or not.
+ * @param rsServerId The id of the replication server we are currently
+ * connected to. Only used when firstConnection is false.
* @param myState The local server state.
* @param rsInfos The list of available replication servers and their
- * associated information (choice will be made among them).
+ * associated information (choice will be made among them).
* @param localServerId The server id for the suffix we are working for.
* @param baseDn The suffix for which we are working for.
* @param groupId The groupId we prefer being connected to if possible
- * @return The computed best replication server.
+ * @param generationId The generation id we are using
+ * @return The computed best replication server. If the returned value is
+ * null, the best replication server is undetermined but the local server must
+ * disconnect (so the best replication server is another one than the current
+ * one). Null can only be returned when firstConnection is false.
*/
- public static String computeBestReplicationServer(ServerState myState,
- Map<String, ServerInfo> rsInfos, int localServerId,
- String baseDn, byte groupId)
+ public static ReplicationServerInfo computeBestReplicationServer(
+ boolean firstConnection, int rsServerId, ServerState myState,
+ Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
+ String baseDn, byte groupId, long generationId)
{
- /*
- * Preference is given to servers with the requested group id:
- * If there are some servers with the requested group id in the provided
- * server list, then we run the search algorithm only on them. If no server
- * with the requested group id, consider all of them.
- */
-
- // Filter for servers with same group id
- Map<String, ServerInfo> sameGroupIdRsInfos =
- new HashMap<String, ServerInfo>();
-
- for (String repServer : rsInfos.keySet())
- {
- ServerInfo serverInfo = rsInfos.get(repServer);
- if (serverInfo.getGroupId() == groupId)
- sameGroupIdRsInfos.put(repServer, serverInfo);
- }
-
- // Some servers with same group id ?
- if (sameGroupIdRsInfos.size() > 0)
- {
- return searchForBestReplicationServer(myState, sameGroupIdRsInfos,
- localServerId, baseDn);
- } else
- {
- return searchForBestReplicationServer(myState, rsInfos,
- localServerId, baseDn);
- }
- }
-
- /**
- * Returns the replication server that best fits our need so that we can
- * connect to it.
- *
- * Note: this method put as public static for unit testing purpose.
- *
- * @param myState The local server state.
- * @param rsInfos The list of available replication servers and their
- * associated information (choice will be made among them).
- * @param localServerID The server id for the suffix we are working for.
- * @param baseDn The suffix for which we are working for.
- * @return The computed best replication server.
- */
- private static String searchForBestReplicationServer(ServerState myState,
- Map<String, ServerInfo> rsInfos, int localServerID, String baseDn)
- {
- /*
- * Find replication servers who are up to date (or more up to date than us,
- * if for instance we failed and restarted, having sent some changes to the
- * RS but without having time to store our own state) regarding our own
- * server id. Then, among them, choose the server that is the most up to
- * date regarding the whole topology.
- *
- * If no server is up to date regarding our own server id, find the one who
- * is the most up to date regarding our server id.
- */
-
- // Should never happen (sanity check)
- if ((myState == null) || (rsInfos == null) || (rsInfos.size() < 1) ||
- (baseDn == null))
- {
- return null;
- }
// Shortcut, if only one server, this is the best
if (rsInfos.size() == 1)
{
- for (String repServer : rsInfos.keySet())
- return repServer;
+ return rsInfos.values().iterator().next();
}
- String bestServer = null;
- boolean bestServerIsLocal = false;
-
- // Servers up to dates with regard to our changes
- HashMap<String, ServerState> upToDateServers =
- new HashMap<String, ServerState>();
- // Servers late with regard to our changes
- HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
-
- /*
- * Start loop to differentiate up to date servers from late ones.
+ /**
+ * Apply some filtering criteria to determine the best servers list from
+ * the available ones. The ordered list of criteria is (from more important
+ * to less important):
+ * - replication server has the same group id as the local DS one
+ * - replication server has the same generation id as the local DS one
+ * - replication server is up to date regarding changes generated by the
+ * local DS
+ * - replication server in the same VM as local DS one
*/
- ChangeNumber myChangeNumber = myState.getMaxChangeNumber(localServerID);
- if (myChangeNumber == null)
+ Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
+ Map<Integer, ReplicationServerInfo> newBestServers;
+ // The list of best replication servers is filtered with each criteria. At
+ // each criteria, the list is replaced with the filtered one if some there
+ // are some servers from the filtering, otherwise, the list is left as is
+ // and the new filtering for the next criteria is applied and so on.
+ for (int filterLevel = 1; filterLevel <= 4; filterLevel++)
{
- myChangeNumber = new ChangeNumber(0, 0, localServerID);
- }
- for (String repServer : rsInfos.keySet())
- {
-
- ServerState rsState = rsInfos.get(repServer).getServerState();
- ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerID);
- if (rsChangeNumber == null)
+ newBestServers = null;
+ switch (filterLevel)
{
- rsChangeNumber = new ChangeNumber(0, 0, localServerID);
+ case 1:
+ // Use only servers locally configured: those are servers declared in
+ // the local configuration. When the current method is called, for
+ // sure, at least one server from the list is locally configured
+ bestServers = filterServersLocallyConfigured(bestServers);
+ break;
+ case 2:
+ // Some servers with same group id ?
+ newBestServers = filterServersWithSameGroupId(bestServers, groupId);
+ if (newBestServers.size() > 0)
+ {
+ bestServers = newBestServers;
+ }
+ break;
+ case 3:
+ // Some servers with same generation id ?
+ newBestServers = filterServersWithSameGenerationId(bestServers,
+ generationId);
+ if (newBestServers.size() > 0)
+ {
+ // Ok some servers with the right generation id
+ bestServers = newBestServers;
+ // If some servers with the right generation id this is useful to
+ // run the local DS change criteria
+ newBestServers = filterServersWithAllLocalDSChanges(bestServers,
+ myState, localServerId);
+ if (newBestServers.size() > 0)
+ {
+ bestServers = newBestServers;
+ }
+ }
+ break;
+ case 4:
+ // Some servers in the local VM ?
+ newBestServers = filterServersInSameVM(bestServers);
+ if (newBestServers.size() > 0)
+ {
+ bestServers = newBestServers;
+ }
+ break;
}
+ }
- // Store state in right list
- if (myChangeNumber.olderOrEqual(rsChangeNumber))
+ /**
+ * Now apply the choice base on the weight to the best servers list
+ */
+ if (bestServers.size() > 1)
+ {
+ if (firstConnection)
{
- upToDateServers.put(repServer, rsState);
+ // We are no connected to a server yet
+ return computeBestServerForWeight(bestServers, -1, -1);
} else
{
- lateOnes.put(repServer, rsState);
+ // We are already connected to a RS: compute the best RS as far as the
+ // weights is concerned. If this is another one, some DS must
+ // disconnect.
+ return computeBestServerForWeight(bestServers, rsServerId,
+ localServerId);
}
- }
-
- if (upToDateServers.size() > 0)
- {
- /*
- * Some up to date servers, among them, choose either :
- * - The local one
- * - The one that has the maximum number of changes to send us.
- * This is the most up to date one regarding the whole topology.
- * This server is the one which has the less
- * difference with the topology server state.
- * For comparison, we need to compute the difference for each
- * server id with the topology server state.
- */
-
- Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
- upToDateServers.size(), baseDn, Integer.toString(localServerID));
- logError(message);
-
- /*
- * First of all, compute the virtual server state for the whole topology,
- * which is composed of the most up to date change numbers for
- * each server id in the topology.
- */
- ServerState topoState = new ServerState();
- for (ServerState curState : upToDateServers.values())
- {
-
- Iterator<Integer> it = curState.iterator();
- while (it.hasNext())
- {
- Integer sId = it.next();
- ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
- if (curSidCn == null)
- {
- curSidCn = new ChangeNumber(0, 0, sId);
- }
- // Update topology state
- topoState.update(curSidCn);
- }
- } // For up to date servers
-
- // Min of the max shifts
- long minShift = -1L;
- for (String upServer : upToDateServers.keySet())
- {
-
- /*
- * Compute the maximum difference between the time of a server id's
- * change number and the time of the matching server id's change
- * number in the topology server state.
- *
- * Note: we could have used the sequence number here instead of the
- * timestamp, but this would have caused a problem when the sequence
- * number loops and comes back to 0 (computation would have becomen
- * meaningless).
- */
- long shift = 0;
- ServerState curState = upToDateServers.get(upServer);
- Iterator<Integer> it = curState.iterator();
- while (it.hasNext())
- {
- Integer sId = it.next();
- ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
- if (curSidCn == null)
- {
- curSidCn = new ChangeNumber(0, 0, sId);
- }
- // Cannot be null as checked at construction time
- ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
- // Cannot be negative as topoState computed as being the max CN
- // for each server id in the topology
- long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
- shift +=tmpShift;
- }
-
- boolean upServerIsLocal =
- ReplicationServer.isLocalReplicationServer(upServer);
- if ((minShift < 0) // First time in loop
- || ((shift < minShift) && upServerIsLocal)
- || (((bestServerIsLocal == false) && (shift < minShift)))
- || ((bestServerIsLocal == false) && (upServerIsLocal &&
- (shift<(minShift + 60)) ))
- || (shift+120 < minShift))
- {
- // This server is even closer to topo state
- bestServer = upServer;
- bestServerIsLocal = upServerIsLocal;
- minShift = shift;
- }
- } // For up to date servers
-
} else
{
- /*
- * We could not find a replication server that has seen all the
- * changes that this server has already processed,
- */
- // lateOnes cannot be empty
- Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
- baseDn, lateOnes.size());
- logError(message);
-
- // Min of the shifts
- long minShift = -1L;
- for (String lateServer : lateOnes.keySet())
- {
- /*
- * Choose the server who is the closest to us regarding our server id
- * (this is the most up to date regarding our server id).
- */
- ServerState curState = lateOnes.get(lateServer);
- ChangeNumber ourSidCn = curState.getMaxChangeNumber(localServerID);
- if (ourSidCn == null)
- {
- ourSidCn = new ChangeNumber(0, 0, localServerID);
- }
- // Cannot be negative as our Cn for our server id is strictly
- // greater than those of the servers in late server list
- long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
-
- boolean lateServerisLocal =
- ReplicationServer.isLocalReplicationServer(lateServer);
- if ((minShift < 0) // First time in loop
- || ((tmpShift < minShift) && lateServerisLocal)
- || (((bestServerIsLocal == false) && (tmpShift < minShift)))
- || ((bestServerIsLocal == false) && (lateServerisLocal &&
- (tmpShift<(minShift + 60)) ))
- || (tmpShift+120 < minShift))
- {
- // This server is even closer to topo state
- bestServer = lateServer;
- bestServerIsLocal = lateServerisLocal;
- minShift = tmpShift;
- }
- } // For late servers
-
+ return bestServers.values().iterator().next();
}
- return bestServer;
+ }
+
+ /**
+ * Creates a new list that contains only replication servers that are locally
+ * configured.
+ * @param bestServers The list of replication servers to filter
+ * @return The sub list of replication servers locally configured
+ */
+ private static Map<Integer, ReplicationServerInfo>
+ filterServersLocallyConfigured(Map<Integer,
+ ReplicationServerInfo> bestServers)
+ {
+ Map<Integer, ReplicationServerInfo> result =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+ if (replicationServerInfo.isLocallyConfigured())
+ {
+ result.put(rsId, replicationServerInfo);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Creates a new list that contains only replication servers that have the
+ * passed group id, from a passed replication server list.
+ * @param bestServers The list of replication servers to filter
+ * @param groupId The group id that must match
+ * @return The sub list of replication servers matching the requested group id
+ * (which may be empty)
+ */
+ private static Map<Integer, ReplicationServerInfo>
+ filterServersWithSameGroupId(Map<Integer,
+ ReplicationServerInfo> bestServers, byte groupId)
+ {
+ Map<Integer, ReplicationServerInfo> result =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+ if (replicationServerInfo.getGroupId() == groupId)
+ {
+ result.put(rsId, replicationServerInfo);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Creates a new list that contains only replication servers that have the
+ * passed generation id, from a passed replication server list.
+ * @param bestServers The list of replication servers to filter
+ * @param generationId The generation id that must match
+ * @return The sub list of replication servers matching the requested
+ * generation id (which may be empty)
+ */
+ private static Map<Integer, ReplicationServerInfo>
+ filterServersWithSameGenerationId(Map<Integer,
+ ReplicationServerInfo> bestServers, long generationId)
+ {
+ Map<Integer, ReplicationServerInfo> result =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+ if (replicationServerInfo.getGenerationId() == generationId)
+ {
+ result.put(rsId, replicationServerInfo);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Creates a new list that contains only replication servers that have the
+ * latest changes from the passed DS, from a passed replication server list.
+ * @param bestServers The list of replication servers to filter
+ * @param localState The state of the local DS
+ * @param localServerId The server id to consider for the changes
+ * @return The sub list of replication servers that have the latest changes
+ * from the passed DS (which may be empty)
+ */
+ private static Map<Integer, ReplicationServerInfo>
+ filterServersWithAllLocalDSChanges(Map<Integer,
+ ReplicationServerInfo> bestServers, ServerState localState,
+ int localServerId)
+ {
+ Map<Integer, ReplicationServerInfo> upToDateServers =
+ new HashMap<Integer, ReplicationServerInfo>();
+ Map<Integer, ReplicationServerInfo> moreUpToDateServers =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ // Extract the change number of the latest change generated by the local
+ // server
+ ChangeNumber myChangeNumber = localState.getMaxChangeNumber(localServerId);
+ if (myChangeNumber == null)
+ {
+ myChangeNumber = new ChangeNumber(0, 0, localServerId);
+ }
+
+ /**
+ * Find replication servers who are up to date (or more up to date than us,
+ * if for instance we failed and restarted, having sent some changes to the
+ * RS but without having time to store our own state) regarding our own
+ * server id. If some servers more up to date, prefer this list.
+ */
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+ ServerState rsState = replicationServerInfo.getServerState();
+ ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerId);
+ if (rsChangeNumber == null)
+ {
+ rsChangeNumber = new ChangeNumber(0, 0, localServerId);
+ }
+
+ // Has this replication server the latest local change ?
+ if (myChangeNumber.olderOrEqual(rsChangeNumber))
+ {
+ if (myChangeNumber.equals(rsChangeNumber))
+ {
+ // This replication server has exactly the latest change from the
+ // local server
+ upToDateServers.put(rsId, replicationServerInfo);
+ } else
+ {
+ // This replication server is even more up to date than the local
+ // server
+ moreUpToDateServers.put(rsId, replicationServerInfo);
+ }
+ }
+ }
+ if (moreUpToDateServers.size() > 0)
+ {
+ // Prefer servers more up to date than local server
+ return moreUpToDateServers;
+ } else
+ {
+ return upToDateServers;
+ }
+ }
+
+ /**
+ * Creates a new list that contains only replication servers that are in the
+ * same VM as the local DS, from a passed replication server list.
+ * @param bestServers The list of replication servers to filter
+ * @return The sub list of replication servers being in the same VM as the
+ * local DS (which may be empty)
+ */
+ private static Map<Integer, ReplicationServerInfo> filterServersInSameVM(
+ Map<Integer, ReplicationServerInfo> bestServers)
+ {
+ Map<Integer, ReplicationServerInfo> result =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+ if (ReplicationServer.isLocalReplicationServer(
+ replicationServerInfo.getServerURL()))
+ {
+ result.put(rsId, replicationServerInfo);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Computes the best replication server the local server should be connected
+ * to so that the load is correctly spread across the topology, following the
+ * weights guidance.
+ * Warning: This method is expected to be called with at least 2 servers in
+ * bestServers
+ * Note: this method is static for test purpose (access from unit tests)
+ * @param bestServers The list of replication servers to consider
+ * @param currentRsServerId The replication server the local server is
+ * currently connected to. -1 if the local server is not yet connected
+ * to any replication server.
+ * @param localServerId The server id of the local server. This is not used
+ * when it is not connected to a replication server
+ * (currentRsServerId = -1)
+ * @return The replication server the local server should be connected to
+ * as far as the weight is concerned. This may be the currently used one if
+ * the weight is correctly spread. If the returned value is null, the best
+ * replication server is undetermined but the local server must disconnect
+ * (so the best replication server is another one than the current one).
+ */
+ public static ReplicationServerInfo computeBestServerForWeight(
+ Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId,
+ int localServerId)
+ {
+ /*
+ * - Compute the load goal of each RS, deducing it from the weights affected
+ * to them.
+ * - Compute the current load of each RS, deducing it from the DSs
+ * currently connected to them.
+ * - Compute the differences between the load goals and the current loads of
+ * the RSs.
+ */
+ // Sum of the weights
+ int sumOfWeights = 0;
+ // Sum of the connected DSs
+ int sumOfConnectedDSs = 0;
+ for (ReplicationServerInfo replicationServerInfo : bestServers.values())
+ {
+ sumOfWeights += replicationServerInfo.getWeight();
+ sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
+ }
+ // Distance (difference) of the current loads to the load goals of each RS:
+ // key:server id, value: distance
+ Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
+ // Precision for the operations (number of digits after the dot)
+ // Default value of rounding method is HALF_UP for
+ // the MathContext
+ MathContext mathContext = new MathContext(32);
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+
+ int rsWeight = replicationServerInfo.getWeight();
+ // load goal = rs weight / sum of weights
+ BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide(
+ new BigDecimal(sumOfWeights), mathContext);
+ BigDecimal currentLoadBd = BigDecimal.ZERO;
+ if (sumOfConnectedDSs != 0)
+ {
+ // current load = number of connected DSs / total number of DSs
+ int connectedDSs = replicationServerInfo.getConnectedDSNumber();
+ currentLoadBd = (new BigDecimal(connectedDSs)).divide(
+ new BigDecimal(sumOfConnectedDSs), mathContext);
+ }
+ // load distance = load goal - current load
+ BigDecimal loadDistanceBd =
+ loadGoalBd.subtract(currentLoadBd, mathContext);
+ loadDistances.put(rsId, loadDistanceBd);
+ }
+
+ if (currentRsServerId == -1)
+ {
+ // The local server is not connected yet
+
+ /*
+ * Find the server with the current highest distance to its load goal and
+ * choose it. Make an exception if every server is correctly balanced,
+ * that is every current load distances are equal to 0, in that case,
+ * choose the server with the highest weight
+ */
+ int bestRsId = 0; // If all server equal, return the first one
+ float highestDistance = Float.NEGATIVE_INFINITY;
+ boolean allRsWithZeroDistance = true;
+ int highestWeightRsId = -1;
+ int highestWeight = -1;
+ for (Integer rsId : bestServers.keySet())
+ {
+ float loadDistance = loadDistances.get(rsId).floatValue();
+ if (loadDistance > highestDistance)
+ {
+ // This server is far more from its balance point
+ bestRsId = rsId;
+ highestDistance = loadDistance;
+ }
+ if (loadDistance != (float)0)
+ {
+ allRsWithZeroDistance = false;
+ }
+ int weight = bestServers.get(rsId).getWeight();
+ if (weight > highestWeight)
+ {
+ // This server has a higher weight
+ highestWeightRsId = rsId;
+ highestWeight = weight;
+ }
+ }
+ // All servers with a 0 distance ?
+ if (allRsWithZeroDistance)
+ {
+ // Choose server withe the highest weight
+ bestRsId = highestWeightRsId;
+ }
+ return bestServers.get(bestRsId);
+ } else
+ {
+ // The local server is currently connected to a RS, let's see if it must
+ // disconnect or not, taking the weights into account.
+
+ float currentLoadDistance =
+ loadDistances.get(currentRsServerId).floatValue();
+ if (currentLoadDistance < (float) 0)
+ {
+ // Too much DSs connected to the current RS, compared with its load
+ // goal:
+ // Determine the potential number of DSs to disconnect from the current
+ // RS and see if the local DS is part of them: the DSs that must
+ // disconnect are those with the lowest server id.
+ // Compute the sum of the distances of the load goals of the other RSs
+ BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
+ for (Integer rsId : bestServers.keySet())
+ {
+ if (rsId != currentRsServerId)
+ {
+ sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
+ loadDistances.get(rsId), mathContext);
+ }
+ }
+
+ if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0)
+ {
+ // The average distance of the other RSs shows a lack of DSs.
+ // Compute the number of DSs to disconnect from the current RS,
+ // rounding to the nearest integer number. Do only this if there is
+ // no risk of yoyo effect: when the exact balance cannot be
+ // established due to the current number of DSs connected, do not
+ // disconnect a DS. A simple example where the balance cannot be
+ // reached is:
+ // - RS1 has weight 1 and 2 DSs
+ // - RS2 has weight 1 and 1 DS
+ // => disconnecting a DS from RS1 to reconnect it to RS2 would have no
+ // sense as this would lead to the reverse situation. In that case,
+ // the perfect balance cannot be reached and we must stick to the
+ // current situation, otherwise the DS would keep move between the 2
+ // RSs
+ float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
+ multiply(new BigDecimal(sumOfConnectedDSs), mathContext).
+ floatValue();
+ int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
+
+ // Avoid yoyo effect
+ if (overloadingDSsNumber == 1)
+ {
+ // What would be the new load distance for the current RS if
+ // we disconnect some DSs ?
+ ReplicationServerInfo currentReplicationServerInfo =
+ bestServers.get(currentRsServerId);
+
+ int currentRsWeight = currentReplicationServerInfo.getWeight();
+ BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight);
+ BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights);
+ BigDecimal currentRsLoadGoalBd =
+ currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
+ BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0);
+ if (sumOfConnectedDSs != 0)
+ {
+ int connectedDSs = currentReplicationServerInfo.
+ getConnectedDSNumber();
+ BigDecimal potentialNewConnectedDSsBd =
+ new BigDecimal(connectedDSs - 1);
+ BigDecimal sumOfConnectedDSsBd =
+ new BigDecimal(sumOfConnectedDSs);
+ potentialCurrentRsNewLoadBd =
+ potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
+ mathContext);
+ }
+ BigDecimal potentialCurrentRsNewLoadDistanceBd =
+ currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
+ mathContext);
+
+ // What would be the new load distance for the other RSs ?
+ BigDecimal additionalDsLoadBd =
+ (new BigDecimal(1)).divide(
+ new BigDecimal(sumOfConnectedDSs), mathContext);
+ BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
+ sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
+ mathContext);
+
+ // Now compare both values: we must no disconnect the DS if this
+ // is for going in a situation where the load distance of the other
+ // RSs is the opposite of the future load distance of the local RS
+ // or we would evaluate that we should disconnect just after being
+ // arrived on the new RS. But we should disconnect if we reach the
+ // perfect balance (both values are 0).
+ MathContext roundMc =
+ new MathContext(6, RoundingMode.DOWN);
+ BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
+ potentialCurrentRsNewLoadDistanceBd.round(roundMc);
+ BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
+ potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
+
+ if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
+ BigDecimal.ZERO) != 0)
+ && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
+ potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
+ {
+ // Avoid the yoyo effect, and keep the local DS connected to its
+ // current RS
+ return bestServers.get(currentRsServerId);
+ }
+ }
+
+ // Prepare a sorted list (from lowest to highest) or DS server ids
+ // connected to the current RS
+ ReplicationServerInfo currentRsInfo =
+ bestServers.get(currentRsServerId);
+ List<Integer> serversConnectedToCurrentRS =
+ currentRsInfo.getConnectedDSs();
+ List<Integer> sortedServers = new ArrayList<Integer>(
+ serversConnectedToCurrentRS);
+ Collections.sort(sortedServers);
+
+ // Go through the list of DSs to disconnect and see if the local
+ // server is part of them.
+ int index = 0;
+ while (overloadingDSsNumber > 0)
+ {
+ int severToDisconnectId = sortedServers.get(index);
+ if (severToDisconnectId == localServerId)
+ {
+ // The local server is part of the DSs to disconnect
+ return null;
+ }
+ overloadingDSsNumber--;
+ index++;
+ }
+
+ // The local server is not part of the servers to disconnect from the
+ // current RS.
+ return bestServers.get(currentRsServerId);
+ } else
+ {
+ // The average distance of the other RSs does not show a lack of DSs:
+ // no need to disconnect any DS from the current RS.
+ return bestServers.get(currentRsServerId);
+ }
+ } else
+ {
+ // The RS load goal is reached or there are not enough DSs connected to
+ // it to reach it: do not disconnect from this RS and return rsInfo for
+ // this RS
+ return bestServers.get(currentRsServerId);
+ }
+ }
}
/**
@@ -1679,28 +2191,6 @@
}
/**
- * Starts the same group id poller.
- */
- private void startSameGroupIdPoller()
- {
- sameGroupIdPoller = new SameGroupIdPoller();
- sameGroupIdPoller.start();
- }
-
- /**
- * Stops the same group id poller.
- */
- private synchronized void stopSameGroupIdPoller()
- {
- if (sameGroupIdPoller != null)
- {
- sameGroupIdPoller.shutdown();
- sameGroupIdPoller.waitForShutdown();
- sameGroupIdPoller = null;
- }
- }
-
- /**
* Stop the heartbeat monitor thread.
*/
synchronized void stopRSHeartBeatMonitoring()
@@ -1926,7 +2416,8 @@
* Receive a message.
* This method is not multithread safe and should either always be
* called in a single thread or protected by a locking mechanism
- * before being called.
+ * before being called. This is a wrapper to the method with a boolean version
+ * so that we do not have to modify existing tests.
*
* @return the received message
* @throws SocketTimeoutException if the timeout set by setSoTimeout
@@ -1934,6 +2425,26 @@
*/
public ReplicationMsg receive() throws SocketTimeoutException
{
+ return receive(false);
+ }
+
+ /**
+ * Receive a message.
+ * This method is not multithread safe and should either always be
+ * called in a single thread or protected by a locking mechanism
+ * before being called.
+ *
+ * @return the received message
+ * @throws SocketTimeoutException if the timeout set by setSoTimeout
+ * has expired
+ * @param allowReconnectionMechanism If true, this allows the reconnection
+ * mechanism to disconnect the broker if it detects that it should reconnect
+ * to another replication server because of some criteria defined by the
+ * algorithm where we choose a suitable replication server.
+ */
+ public ReplicationMsg receive(boolean allowReconnectionMechanism)
+ throws SocketTimeoutException
+ {
while (shutdown == false)
{
if (!connected)
@@ -1956,13 +2467,16 @@
{
WindowMsg windowMsg = (WindowMsg) msg;
sendWindow.release(windowMsg.getNumAck());
- }
- else if (msg instanceof TopologyMsg)
+ } else if (msg instanceof TopologyMsg)
{
- TopologyMsg topoMsg = (TopologyMsg)msg;
+ TopologyMsg topoMsg = (TopologyMsg) msg;
receiveTopo(topoMsg);
- }
- else if (msg instanceof StopMsg)
+ if (allowReconnectionMechanism)
+ {
+ // Reset wait time before next computation of best server
+ mustRunBestServerCheckingAlgorithm = 0;
+ }
+ } else if (msg instanceof StopMsg)
{
/*
* RS performs a proper disconnection
@@ -1974,8 +2488,7 @@
logError(message);
// Try to find a suitable RS
this.reStart(failingSession);
- }
- else if (msg instanceof MonitorMsg)
+ } else if (msg instanceof MonitorMsg)
{
// This is the response to a MonitorRequest that was sent earlier or
// the regular message of the monitoring publisher of the RS.
@@ -1997,16 +2510,53 @@
monitorResponse.notify();
}
- // Extract and store replication servers ServerStates
- rsStates = new HashMap<Integer, ServerState>();
+ // Update the replication servers ServerStates with new received info
it = monitorMsg.rsIterator();
while (it.hasNext())
{
int srvId = it.next();
- rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
+ ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
+ if (rsInfo != null)
+ {
+ rsInfo.update(monitorMsg.getRSServerState(srvId));
+ }
}
- }
- else
+
+ // Now if it is allowed, compute the best replication server to see if
+ // it is still the one we are currently connected to. If not,
+ // disconnect properly and let the connection algorithm re-connect to
+ // best replication server
+ if (allowReconnectionMechanism)
+ {
+ mustRunBestServerCheckingAlgorithm++;
+ if (mustRunBestServerCheckingAlgorithm == 2)
+ {
+ // Stable topology (no topo msg since few seconds): proceed with
+ // best server checking.
+ ReplicationServerInfo bestServerInfo =
+ computeBestReplicationServer(false, rsServerId, state,
+ replicationServerInfos, serverId, baseDn, groupId,
+ generationID);
+
+ if ((bestServerInfo == null) ||
+ (bestServerInfo.getServerId() != rsServerId))
+ {
+ // The best replication server is no more the one we are
+ // currently using. Disconnect properly then reconnect.
+ Message message =
+ NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
+ Integer.toString(serverId),
+ Integer.toString(rsServerId),
+ rsServerUrl);
+ logError(message);
+ reStart();
+ }
+
+ // Reset wait time before next computation of best server
+ mustRunBestServerCheckingAlgorithm = 0;
+ }
+ }
+ } else
{
return msg;
}
@@ -2018,15 +2568,14 @@
if (shutdown == false)
{
if ((session == null) || (!session.closeInitiated()))
-
{
/*
* We did not initiate the close on our side, log an error message.
*/
Message message =
ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
- Integer.toString(rsServerId), baseDn.toString(),
- Integer.toString(serverId));
+ Integer.toString(rsServerId), baseDn.toString(),
+ Integer.toString(serverId));
logError(message);
}
this.reStart(failingSession);
@@ -2066,7 +2615,8 @@
}
}
} catch (InterruptedException e)
- {}
+ {
+ }
return replicaStates;
}
@@ -2081,7 +2631,7 @@
{
try
{
- updateDoneCount ++;
+ updateDoneCount++;
if ((updateDoneCount >= halfRcvWindow) && (session != null))
{
session.publish(new WindowMsg(updateDoneCount));
@@ -2103,10 +2653,9 @@
if (debugEnabled())
{
debugInfo("ReplicationBroker " + serverId + " is stopping and will" +
- " close the connection to replication server " + rsServerId + " for"
- + " domain " + baseDn);
+ " close the connection to replication server " + rsServerId + " for" +
+ " domain " + baseDn);
}
- stopSameGroupIdPoller();
stopRSHeartBeatMonitoring();
stopChangeTimeHeartBeatPublishing();
replicationServer = "stopped";
@@ -2237,8 +2786,8 @@
* @param groupId The new group id to use
*/
public boolean changeConfig(
- Collection<String> replicationServers, int window, long heartbeatInterval,
- byte groupId)
+ Collection<String> replicationServers, int window, long heartbeatInterval,
+ byte groupId)
{
// These parameters needs to be renegotiated with the ReplicationServer
// so if they have changed, that requires restarting the session with
@@ -2248,11 +2797,11 @@
// A new session is necessary only when information regarding
// the connection is modified
if ((servers == null) ||
- (!(replicationServers.size() == servers.size()
- && replicationServers.containsAll(servers))) ||
- window != this.maxRcvWindow ||
- heartbeatInterval != this.heartbeatInterval ||
- (groupId != this.groupId))
+ (!(replicationServers.size() == servers.size() && replicationServers.
+ containsAll(servers))) ||
+ window != this.maxRcvWindow ||
+ heartbeatInterval != this.heartbeatInterval ||
+ (groupId != this.groupId))
{
needToRestartSession = true;
}
@@ -2313,152 +2862,6 @@
}
/**
- * In case we are connected to a RS with a different group id, we use this
- * thread to poll presence of a RS with the same group id as ours. If a RS
- * with the same group id is available, we close the session to force
- * reconnection. Reconnection will choose a server with the same group id.
- */
- private class SameGroupIdPoller extends DirectoryThread
- {
-
- private boolean sameGroupIdPollershutdown = false;
- private boolean terminated = false;
- // Sleep interval in ms
- private static final int SAME_GROUP_ID_POLLER_PERIOD = 5000;
-
- public SameGroupIdPoller()
- {
- super("Replication Broker Same Group Id Poller for " + baseDn.toString() +
- " and group id " + groupId + " in server id " + serverId);
- }
-
- /**
- * Wait for the completion of the same group id poller.
- */
- public void waitForShutdown()
- {
- try
- {
- while (terminated == false)
- {
- Thread.sleep(50);
- }
- } catch (InterruptedException e)
- {
- // exit the loop if this thread is interrupted.
- }
- }
-
- /**
- * Shutdown the same group id poller.
- */
- public void shutdown()
- {
- sameGroupIdPollershutdown = true;
- }
-
- /**
- * Permanently look for RS with our group id and if found, break current
- * connection to force reconnection to a new server with the right group id.
- */
- @Override
- public void run()
- {
- boolean done = false;
-
- if (debugEnabled())
- {
- TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
- " started.");
- }
-
- while ((!done) && (!sameGroupIdPollershutdown))
- {
- // Sleep some time between checks
- try
- {
- Thread.sleep(SAME_GROUP_ID_POLLER_PERIOD);
- } catch (InterruptedException e)
- {
- // Stop as we are interrupted
- sameGroupIdPollershutdown = true;
- }
- synchronized (connectPhaseLock)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("Running SameGroupIdPoller for: " +
- baseDn.toString());
- }
- if (session != null) // Check only if not already disconnected
-
- {
- for (String server : servers)
- {
- // Do not ask the RS we are connected to as it has for sure the
- // wrong group id
- if (server.equals(rsServerUrl))
- continue;
-
- // Connect to server and get reply message
- ServerInfo serverInfo =
- performPhaseOneHandshake(server, false);
-
- // Is it a server with our group id ?
- if (serverInfo != null)
- {
- if (groupId == serverInfo.getGroupId())
- {
- // Found one server with the same group id as us, disconnect
- // session to force reconnection to a server with same group
- // id.
- Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get(
- Byte.toString(groupId), baseDn.toString(),
- Integer.toString(serverId));
- logError(message);
-
- if (protocolVersion >=
- ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // V4 protocol introduces a StopMsg to properly end
- // communications
- try
- {
- session.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
- }
- try
- {
- session.close();
- } catch (Exception e)
- {
- // The session was already closed, just ignore.
- }
- session = null;
- done = true; // Terminates thread as did its job.
-
- break;
- }
- }
- } // for server
-
- }
- }
- }
-
- terminated = true;
- if (debugEnabled())
- {
- TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
- " terminated.");
- }
- }
- }
-
- /**
* Signals the RS we just entered a new status.
* @param newStatus The status the local DS just entered
*/
@@ -2505,7 +2908,42 @@
*/
public List<RSInfo> getRsList()
{
- return rsList;
+ List<RSInfo> result = new ArrayList<RSInfo>();
+
+ for (ReplicationServerInfo replicationServerInfo :
+ replicationServerInfos.values())
+ {
+ result.add(replicationServerInfo.toRSInfo());
+ }
+ return result;
+ }
+
+ /**
+ * Computes the list of DSs connected to a particular RS.
+ * @param rsId The RS id of the server one wants to know the connected DSs
+ * @param dsList The list of DSinfo from which to compute things
+ * @return The list of connected DSs to the server rsId
+ */
+ private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList)
+ {
+ List<Integer> connectedDSs = new ArrayList<Integer>();
+
+ if (rsServerId == rsId)
+ {
+ // If we are computing connected DSs for the RS we are connected
+ // to, we should count the local DS as the DSInfo of the local DS is not
+ // sent by the replication server in the topology message. We must count
+ // ourself as a connected server.
+ connectedDSs.add(serverId);
+ }
+
+ for (DSInfo dsInfo : dsList)
+ {
+ if (dsInfo.getRsId() == rsId)
+ connectedDSs.add(dsInfo.getDsId());
+ }
+
+ return connectedDSs;
}
/**
@@ -2516,13 +2954,49 @@
*/
public void receiveTopo(TopologyMsg topoMsg)
{
- // Store new lists
- synchronized(getDsList())
+ // Store new DS list
+ dsList = topoMsg.getDsList();
+
+ // Update replication server info list with the received topology
+ // information
+ List<Integer> rsToKeepList = new ArrayList<Integer>();
+ for (RSInfo rsInfo : topoMsg.getRsList())
{
- synchronized(getRsList())
+ int rsId = rsInfo.getId();
+ rsToKeepList.add(rsId); // Mark this server as still existing
+ List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
+ ReplicationServerInfo replicationServerInfo =
+ replicationServerInfos.get(rsId);
+ if (replicationServerInfo == null)
{
- dsList = topoMsg.getDsList();
- rsList = topoMsg.getRsList();
+ // New replication server, create info for it add it to the list
+ replicationServerInfo =
+ new ReplicationServerInfo(rsInfo, connectedDSs);
+ // Set the locally configured flag for this new RS only if it is
+ // configured
+ updateRSInfoLocallyConfiguredStatus(replicationServerInfo);
+ replicationServerInfos.put(rsId, replicationServerInfo);
+ } else
+ {
+ // Update the existing info for the replication server
+ replicationServerInfo.update(rsInfo, connectedDSs);
+ }
+ }
+
+ /**
+ * Now remove any replication server that may have disappeared from the
+ * topology.
+ */
+ Iterator<Entry<Integer, ReplicationServerInfo>> rsInfoIt =
+ replicationServerInfos.entrySet().iterator();
+ while (rsInfoIt.hasNext())
+ {
+ Entry<Integer, ReplicationServerInfo> rsInfoEntry = rsInfoIt.next();
+ if (!rsToKeepList.contains(rsInfoEntry.getKey()))
+ {
+ // This replication server has quit the topology, remove it from the
+ // list
+ rsInfoIt.remove();
}
}
if (domain != null)
@@ -2536,6 +3010,7 @@
}
}
}
+
/**
* Check if the broker could not find any Replication Server and therefore
* connection attempt failed.
@@ -2557,16 +3032,15 @@
{
ctHeartbeatPublisherThread =
new CTHeartbeatPublisherThread(
- "Replication CN Heartbeat sender for " +
- baseDn + " with " + getReplicationServer(),
- session, changeTimeHeartbeatSendInterval, serverId);
+ "Replication CN Heartbeat sender for " +
+ baseDn + " with " + getReplicationServer(),
+ session, changeTimeHeartbeatSendInterval, serverId);
ctHeartbeatPublisherThread.start();
- }
- else
+ } else
{
if (debugEnabled())
TRACER.debugInfo(this +
- " is not configured to send CN heartbeat interval");
+ " is not configured to send CN heartbeat interval");
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index ee1f043..b1a1150 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -719,7 +719,7 @@
ReplicationMsg msg;
try
{
- msg = broker.receive();
+ msg = broker.receive(true);
if (msg == null)
{
// The server is in the shutdown process
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
index a8800ec..506ef1a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.common;
@@ -69,7 +69,7 @@
// TODO Check result;
// Check update
- assertFalse(serverState.update(null));
+ assertFalse(serverState.update((ChangeNumber)null));
assertTrue(serverState.update(cn));
assertFalse(serverState.update(cn));
ChangeNumber cn1, cn2, cn3;
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 13bfb26..85ad615 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -518,7 +518,7 @@
// Send topo view
List<RSInfo> rsList = new ArrayList<RSInfo>();
- RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
+ RSInfo rsInfo = new RSInfo(serverId, "localhost:" + port, generationId, groupId, 1);
rsList.add(rsInfo);
TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(),
rsList);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
index 7588ade..8c7baed 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -22,11 +22,14 @@
* CDDL HEADER END
*
*
- * Copyright 2008-2009 Sun Microsystems, Inc.
+ * Copyright 2008-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import static org.opends.server.replication.service.ReplicationBroker.*;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -39,6 +42,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.server.ReplicationServer;
@@ -46,8 +50,8 @@
import org.testng.annotations.Test;
/**
- * Test the algorithm for find the best replication server among the configured
- * ones.
+ * Test the algorithm for finding the best replication server among the
+ * configured ones.
*/
public class ComputeBestServerTest extends ReplicationTestCase
{
@@ -93,7 +97,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -102,14 +107,15 @@
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
}
/**
@@ -139,7 +145,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -150,14 +157,15 @@
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
}
/**
@@ -191,7 +199,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -200,14 +209,15 @@
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
}
/**
@@ -239,7 +249,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -250,14 +261,15 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
}
/**
@@ -290,7 +302,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -301,9 +314,9 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -314,14 +327,15 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
}
/**
@@ -354,7 +368,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -367,9 +382,9 @@
// This server has less changes than the other one but it has the same
// group id as us so he should be the winner
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -380,14 +395,15 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, LOOSER1, null, 0, aState, (short)0, 0L,
false, (byte)2, 0);
- rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
}
/**
@@ -420,7 +436,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -431,9 +448,9 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
false, (byte)2, 0);
- rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -444,14 +461,15 @@
cn = new ChangeNumber(2L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)2, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
}
/**
@@ -485,7 +503,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -496,9 +515,9 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -509,9 +528,9 @@
cn = new ChangeNumber(4L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
// State for server 3
aState = new ServerState();
@@ -522,14 +541,15 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(13, LOOSER2, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
}
/**
@@ -563,7 +583,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -574,9 +595,9 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -587,9 +608,9 @@
cn = new ChangeNumber(3L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, LOOSER2, null, 0, aState, (short)0, 0L,
false, (byte)2, 0);
- rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
// State for server 3
aState = new ServerState();
@@ -602,14 +623,15 @@
// This server has less changes than looser2 but it has the same
// group id as us so he should be the winner
replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(13, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
}
/**
@@ -641,7 +663,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -652,278 +675,17 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte)1, 0L);
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
}
- /**
- * Test with 2 replication servers, late.
- *
- * @throws Exception If a problem occurred
- */
- @Test
- public void test2ServersLate() throws Exception
- {
- String testCase = "test2ServersLate";
-
- debugInfo("Starting " + testCase);
-
- // definitions for server ids
- int myId1 = 1;
- int myId2 = 2;
- int myId3 = 3;
- // definitions for server names
- final String WINNER = "winner";
- final String LOOSER1 = "looser1";
-
- // Create my state
- ServerState mySt = new ServerState();
- ChangeNumber cn = new ChangeNumber(2L, 0, myId1);
- mySt.update(cn);
- cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
- mySt.update(cn);
- cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
- mySt.update(cn);
-
- // Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
-
- // State for server 1
- ServerState aState = new ServerState();
- cn = new ChangeNumber(0L, 0, myId1);
- aState.update(cn);
- cn = new ChangeNumber(10L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(10L, 0, myId3);
- aState.update(cn);
- ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
-
- // State for server 2
- aState = new ServerState();
- cn = new ChangeNumber(1L, 0, myId1);
- aState.update(cn);
- cn = new ChangeNumber(0L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(0L, 0, myId3);
- aState.update(cn);
- replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
-
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
-
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
- }
-
- /**
- * Test with 3 replication servers, late.
- *
- * @throws Exception If a problem occurred
- */
- @Test
- public void test3ServersLate() throws Exception
- {
- String testCase = "test3ServersLate";
-
- debugInfo("Starting " + testCase);
-
- // definitions for server ids
- int myId1 = 1;
- int myId2 = 2;
- int myId3 = 3;
- // definitions for server names
- final String WINNER = "winner";
- final String LOOSER1 = "looser1";
- final String LOOSER2 = "looser2";
-
- // Create my state
- ServerState mySt = new ServerState();
- ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
- mySt.update(cn);
- cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
- mySt.update(cn);
- cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
- mySt.update(cn);
-
- // Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
-
- // State for server 1
- ServerState aState = new ServerState();
- cn = new ChangeNumber(1L, 0, myId1);
- aState.update(cn);
- cn = new ChangeNumber(10L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(10L, 0, myId3);
- aState.update(cn);
- ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
-
- // State for server 2
- aState = new ServerState();
- cn = new ChangeNumber(3L, 0, myId1);
- aState.update(cn);
- cn = new ChangeNumber(0L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(0L, 0, myId3);
- aState.update(cn);
- replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
-
- // State for server 3
- aState = new ServerState();
- cn = new ChangeNumber(2L, 0, myId1);
- aState.update(cn);
- cn = new ChangeNumber(10L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(10L, 0, myId3);
- aState.update(cn);
- replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
-
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
-
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
- }
-
- /**
- * Test with 6 replication servers, some up, some late, one null
- *
- * @throws Exception If a problem occurred
- */
- @Test
- public void test6ServersMixed() throws Exception
- {
- String testCase = "test6ServersMixed";
-
- debugInfo("Starting " + testCase);
-
- // definitions for server ids
- int myId1 = 1;
- int myId2 = 2;
- int myId3 = 3;
-
- // definitions for server names
- final String WINNER = "winner";
- final String LOOSER1 = "looser1";
- final String LOOSER2 = "looser2";
- final String LOOSER3 = "looser3";
- final String LOOSER4 = "looser4";
- final String LOOSER5 = "looser5";
-
- // Create my state
- ServerState mySt = new ServerState();
- ChangeNumber cn = new ChangeNumber(5L, 0, myId1);
- mySt.update(cn);
- cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
- mySt.update(cn);
- cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
- mySt.update(cn);
-
- // Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
-
- // State for server 1
- ServerState aState = new ServerState();
- cn = new ChangeNumber(4L, 0, myId1);
- aState.update(cn);
- cn = new ChangeNumber(10L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(10L, 0, myId3);
- aState.update(cn);
- ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
-
- // State for server 2
- aState = new ServerState();
- cn = new ChangeNumber(7L, 0, myId1);
- aState.update(cn);
- cn = new ChangeNumber(6L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(5L, 0, myId3);
- aState.update(cn);
- replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
-
- // State for server 3
- aState = new ServerState();
- cn = new ChangeNumber(3L, 0, myId1);
- aState.update(cn);
- cn = new ChangeNumber(10L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(10L, 0, myId3);
- aState.update(cn);
- replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(LOOSER3, ServerInfo.newServerInfo(replServerStartMsg));
-
- // State for server 4
- aState = new ServerState();
- cn = new ChangeNumber(6L, 0, myId1);
- aState.update(cn);
- cn = new ChangeNumber(6L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(8L, 0, myId3);
- aState.update(cn);
- replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
-
- // State for server 5 (null one for our serverid)
- aState = new ServerState();
- cn = new ChangeNumber(5L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(5L, 0, myId3);
- aState.update(cn);
- replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(LOOSER4, ServerInfo.newServerInfo(replServerStartMsg));
-
- // State for server 6
- aState = new ServerState();
- cn = new ChangeNumber(5L, 0, myId1);
- aState.update(cn);
- cn = new ChangeNumber(7L, 0, myId2);
- aState.update(cn);
- cn = new ChangeNumber(6L, 0, myId3);
- aState.update(cn);
- replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
- false, (byte)1, 0);
- rsInfos.put(LOOSER5, ServerInfo.newServerInfo(replServerStartMsg));
-
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
-
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
- }
-
-
@DataProvider(name = "create3ServersData")
public Object[][] create3ServersData() {
return new Object[][] {
@@ -934,25 +696,26 @@
{ 4, 2, 3, true, 4, 2, 3, false, 4, 2, 3, false},
// test that the local ServerID is more important than the others
- { 3, 0, 0, false, 1, 100, 100, false, 2, 100, 100, false},
+ { 4, 0, 0, false, 2, 100, 100, false, 1, 100, 100, false},
- // test that the local RS is chosen first even if it is a bit late
- { 4, 1, 1, true, 4, 2, 3, false, 4, 2, 3, false},
-
- // test that the local RS is not chosen first when it is very late
- { 4, 1000, 1000, false, 4, 2, 3, true, 4, 2, 1000, true},
+ // test that a remote RS is chosen first when up to date when the local
+ // one is late
+ { 4, 1, 1, false, 3, 1, 1, true, 3, 1, 1, false},
// test that the local RS is not chosen first when it is missing
// local changes
- { 4, 1, 1, false, 3, 2, 3, true, 1, 1, 1, false},
+ { 4, 1, 1, false, 3, 2, 3, false, 1, 1, 1, true},
- // test that the local RS is not chosen first when it is missing
- // more local changes than another RS
- { 4, 1, 1, false, 2, 2, 3, true, 1, 1, 1, false},
+ // test that a RS which is more up to date than the DS is chosen
+ { 5, 1, 1, false, 2, 0, 0, false, 1, 1, 1, false},
+
+ // test that a RS which is more up to date than the DS is chosen even
+ // is some RS with the same last change from the DS
+ { 5, 1, 1, false, 4, 0, 0, false, 4, 1, 1, false},
// test that the local RS is chosen first when it is missing
- // the same local changes as the other RS
- { 3, 1, 1, true, 3, 1, 1, false, 3, 1, 1, false},
+ // the same local changes as the other RSs
+ { 3, 1, 1, true, 2, 1, 1, false, 3, 1, 1, false},
};
}
@@ -990,7 +753,8 @@
mySt.update(cn);
// Create replication servers info list
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
// State for server 1
ServerState aState = new ServerState();
@@ -1001,9 +765,9 @@
cn = new ChangeNumber(looser1T3, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
if (looser1IsLocal)
ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER1);
@@ -1016,9 +780,9 @@
cn = new ChangeNumber(winnerT3, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
if (winnerIsLocal)
ReplicationServer.onlyForTestsAddlocalReplicationServer(WINNER);
@@ -1031,17 +795,1025 @@
cn = new ChangeNumber(looser2T3, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(13, LOOSER2, null, 0, aState, (short)0, 0L,
false, (byte)1, 0);
- rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
+ rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
if (looser2IsLocal)
ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER2);
- String bestServer =
- computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte) 1,
+ 0L);
ReplicationServer.onlyForTestsClearLocalReplicationServerList();
- assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
+ }
+
+ @DataProvider(name = "test3ServersMoreCriteria")
+ public Object[][] create3ServersMoreCriteriaData() {
+ return new Object[][] {
+ // Test that a RS is chosen if its group is ok whereas the other parameters
+ // are not ok
+ { 1L, 1L, (byte)1, false, 4L, 0L, (byte)2, false, 4L, 0L, (byte)3, false},
+
+ // Test that a RS is chosen if its genid is ok (all RS with same group)
+ // and state is not ok
+ { 1L, 0L, (byte)1, false, 4L, 1L, (byte)1, false, 4L, 2L, (byte)1, false},
+
+ // Test that a RS is chosen if all servers have wrong genid and group id
+ // but it is local
+ { 1L, 1L, (byte)2, true, 4L, 2L, (byte)3, false, 5L, 3L, (byte)4, false}
+ };
+ }
+
+ /**
+ * Test with 3 replication servers (see data provider)
+ */
+ @Test(dataProvider = "test3ServersMoreCriteria")
+ public void test3ServersMoreCriteria(
+ long winnerT1, long winnerGenId, byte winnerGroupId, boolean winnerIsLocal,
+ long looser1T1, long looser1GenId, byte looser1GroupId, boolean looser1IsLocal,
+ long looser2T1, long looser2GenId, byte looser2GroupId, boolean looser2IsLocal)
+ throws Exception
+ {
+ String testCase = "test3ServersMoreCriteria";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ int myId1 = 1;
+ int myId2 = 2;
+ int myId3 = 3;
+
+ // definitions for server names
+ final String WINNER = "localhost:123";
+ final String LOOSER1 = "localhost:456";
+ final String LOOSER2 = "localhost:789";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
+ mySt.update(cn);
+
+ // Create replication servers info list
+ HashMap<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(looser1T1, 0, myId1);
+ aState.update(cn);
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, looser1GenId,
+ false, looser1GroupId, 0);
+ rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
+ if (looser1IsLocal)
+ ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER1);
+
+ // State for server 2
+ aState = new ServerState();
+ cn = new ChangeNumber(winnerT1, 0, myId1);
+ aState.update(cn);
+ replServerStartMsg =
+ new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, winnerGenId,
+ false, winnerGroupId, 0);
+ rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
+ if (winnerIsLocal)
+ ReplicationServer.onlyForTestsAddlocalReplicationServer(WINNER);
+
+ // State for server 3
+ aState = new ServerState();
+ cn = new ChangeNumber(looser2T1, 0, myId1);
+ aState.update(cn);
+ replServerStartMsg =
+ new ReplServerStartMsg(13, LOOSER2, null, 0, aState, (short)0, looser2GenId,
+ false, looser2GroupId, 0);
+ rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
+ if (looser2IsLocal)
+ ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER2);
+
+ ReplicationServerInfo bestServer =
+ computeBestReplicationServer(true, -1, mySt, rsInfos, myId1, " ", (byte) 1,
+ 0L);
+
+ ReplicationServer.onlyForTestsClearLocalReplicationServerList();
+
+ assertEquals(bestServer.getServerURL(),
+ WINNER, "Wrong best replication server.");
+ }
+
+ @DataProvider(name = "testComputeBestServerForWeightProvider")
+ public Object[][] testComputeBestServerForWeightProvider() {
+
+ Object[][] testData = new Object[24][];
+
+ HashMap<Integer, ReplicationServerInfo> rsInfos = null;
+ new HashMap<Integer, ReplicationServerInfo>();
+ RSInfo rsInfo = null;
+ List<Integer> connectedDSs = null;
+ Object[] params = null;
+
+ /************************
+ * First connection tests
+ ************************/
+
+ /**
+ * 1 RS, no connected DSs
+ * Expected winner: the RS
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "AwinnerHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = -1; // current RS id
+ params[2] = -1; // local DS id
+ params[3] = "AwinnerHost:123"; // winner url
+ testData[0] = params;
+
+ /**
+ * 2 RSs with TL=0.5, no connected DSs
+ * Excepted winner: first in the list
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "BwinnerHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = -1; // current RS id
+ params[2] = -1; // local DS id
+ params[3] = rsInfos.values().iterator().next().getServerURL(); // winner url
+ testData[1] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.5 - CL=1.0 - DS=1 ; RS2: TL=0.5 - CL=0 - DS=0
+ * Excepted winner: R2 (still no connected DS)
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "CwinnerHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = -1; // current RS id
+ params[2] = -1; // local DS id
+ params[3] = "CwinnerHost:456"; // winner url
+ testData[2] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.5 - CL=0.5 - DS=1 ; RS2: TL=0.5 - CL=0.5 - DS=1
+ * Excepted winner: first in the list as both RSs reached TL
+ * and have same weight
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "DwinnerHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = -1; // current RS id
+ params[2] = -1; // local DS id
+ params[3] = rsInfos.values().iterator().next().getServerURL(); // winner url
+ testData[3] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.5 - CL=2/3 - DS=2 ; RS2: TL=0.5 - CL=1/3 - DS=1
+ * Excepted winner: RS2 -> 2 DSs on each RS
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "EwinnerHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = -1; // current RS id
+ params[2] = -1; // local DS id
+ params[3] = "EwinnerHost:456"; // winner url
+ testData[4] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=1/3 - CL=0.5 - DS=1 ; RS2: TL=2/3 - CL=0.5 - DS=1
+ * Excepted winner: RS2 -> go to perfect load balance
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "FwinnerHost:456", 0L, (byte)1, 2);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = -1; // current RS id
+ params[2] = -1; // local DS id
+ params[3] = "FwinnerHost:456"; // winner url
+ testData[5] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=1/3 - CL=1/3 - DS=1 ; RS2: TL=2/3 - CL=2/3 - DS=2
+ * Excepted winner: RS2 -> already load balanced so choose server with the
+ * highest weight
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "GwinnerHost:456", 0L, (byte)1, 2);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ connectedDSs.add(102);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = -1; // current RS id
+ params[2] = -1; // local DS id
+ params[3] = "GwinnerHost:456"; // winner url
+ testData[6] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=1/3 - CL=1/3 - DS=2 ; RS2: TL=2/3 - CL=2/3 - DS=4
+ * Excepted winner: RS2 -> already load balanced so choose server with the
+ * highest weight
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "HwinnerHost:456", 0L, (byte)1, 2);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ connectedDSs.add(102);
+ connectedDSs.add(103);
+ connectedDSs.add(104);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = -1; // current RS id
+ params[2] = -1; // local DS id
+ params[3] = "HwinnerHost:456"; // winner url
+ testData[7] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=1/6 - CL=1/6 - DS=1 ; RS2: TL=2/6 - CL=2/6 - DS=2 ; RS3: TL=3/6 - CL=3/6 - DS=3
+ * Excepted winner: RS3 -> already load balanced so choose server with the
+ * highest weight
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 2);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ connectedDSs.add(102);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "IwinnerHost:789", 0L, (byte)1, 3);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(201);
+ connectedDSs.add(202);
+ connectedDSs.add(203);
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = -1; // current RS id
+ params[2] = -1; // local DS id
+ params[3] = "IwinnerHost:789"; // winner url
+ testData[8] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=5/10 - CL=3/9 - DS=3 ; RS2: TL=3/10 - CL=5/9 - DS=5 ; RS3: TL=2/10 - CL=1/9 - DS=1
+ * Excepted winner: RS1 -> misses more DSs than RS3
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "JwinnerHost:123", 0L, (byte)1, 5);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ connectedDSs.add(3);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 3);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ connectedDSs.add(102);
+ connectedDSs.add(103);
+ connectedDSs.add(104);
+ connectedDSs.add(105);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 2);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(201);
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = -1; // current RS id
+ params[2] = -1; // local DS id
+ params[3] = "JwinnerHost:123"; // winner url
+ testData[9] = params;
+
+ /*************************
+ * Already connected tests
+ *************************/
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.5 - CL=0.5 - DS=1 ; RS2: TL=0.5 - CL=0.5 - DS=1
+ * Excepted winner: RS2 (stay connected to it as load correctly spread)
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "KwinnerHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 12; // current RS id
+ params[2] = 101; // local DS id
+ params[3] = "KwinnerHost:456"; // winner url
+ testData[10] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.5 - CL=1.0 - DS=2 ; RS2: TL=0.5 - CL=0.0 - DS=0
+ * Excepted winner: RS2 (one must disconnect from RS1)
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "LwinnerHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 11; // current RS id
+ params[2] = 1; // local DS id
+ params[3] = null; // winner url
+ testData[11] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.5 - CL=1.0 - DS=2 ; RS2: TL=0.5 - CL=0.0 - DS=0
+ * Excepted winner: RS1 (one server must disconnect from RS1 but it is the
+ * one with the lowest id so not DS with server id 2)
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "MwinnerHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 11; // current RS id
+ params[2] = 2; // local DS id
+ params[3] = "MwinnerHost:123"; // winner url
+ testData[12] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.3 - CL=0.3 - DS=6 ; RS2: TL=0.4 - CL=0.4 - DS=8 ;
+ * RS3: TL=0.1 - CL=0.1 - DS=2 ; RS4: TL=0.2 - CL=0.2 - DS=4
+ * Excepted winner: RS2 no change as load correctly spread
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 3);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ connectedDSs.add(3);
+ connectedDSs.add(4);
+ connectedDSs.add(5);
+ connectedDSs.add(6);
+
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "NwinnerHost:456", 0L, (byte)1, 4);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ connectedDSs.add(102);
+ connectedDSs.add(103);
+ connectedDSs.add(104);
+ connectedDSs.add(105);
+ connectedDSs.add(106);
+ connectedDSs.add(107);
+ connectedDSs.add(108);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(201);
+ connectedDSs.add(202);
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(301);
+ connectedDSs.add(302);
+ connectedDSs.add(303);
+ connectedDSs.add(304);
+ rsInfos.put(14, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 12; // current RS id
+ params[2] = 101; // local DS id
+ params[3] = "NwinnerHost:456"; // winner url
+ testData[13] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.3 - CL=0.2 - DS=4 ; RS2: TL=0.4 - CL=0.4 - DS=8 ;
+ * RS3: TL=0.1 - CL=0.1 - DS=2 ; RS4: TL=0.2 - CL=0.3 - DS=6
+ * Excepted winner: RS2: no change load ok on current server and there is the
+ * possibility to arrange load for other servers with disconnection from
+ * 2 DSs from RS4 and reconnect them to RS1 (we moved these 2 servers from
+ * previous test where the loads were ok)
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 3);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ connectedDSs.add(3);
+ connectedDSs.add(4);
+
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "OwinnerHost:456", 0L, (byte)1, 4);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ connectedDSs.add(102);
+ connectedDSs.add(103);
+ connectedDSs.add(104);
+ connectedDSs.add(105);
+ connectedDSs.add(106);
+ connectedDSs.add(107);
+ connectedDSs.add(108);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(201);
+ connectedDSs.add(202);
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(301);
+ connectedDSs.add(302);
+ connectedDSs.add(303);
+ connectedDSs.add(304);
+ connectedDSs.add(305);
+ connectedDSs.add(306);
+ rsInfos.put(14, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 12; // current RS id
+ params[2] = 101; // local DS id
+ params[3] = "OwinnerHost:456"; // winner url
+ testData[14] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.3 - CL=0.2 - DS=4 ; RS2: TL=0.4 - CL=0.4 - DS=8 ;
+ * RS3: TL=0.1 - CL=0.1 - DS=2 ; RS4: TL=0.2 - CL=0.3 - DS=6
+ * Excepted winner: RS4 : 2 DSs should go away from RS4 and server id 302
+ * is one of the two lowest ids connected to RS4
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "PwinnerHost:123", 0L, (byte)1, 3);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ connectedDSs.add(3);
+ connectedDSs.add(4);
+
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 4);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ connectedDSs.add(102);
+ connectedDSs.add(103);
+ connectedDSs.add(104);
+ connectedDSs.add(105);
+ connectedDSs.add(106);
+ connectedDSs.add(107);
+ connectedDSs.add(108);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(201);
+ connectedDSs.add(202);
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(306);
+ connectedDSs.add(305);
+ connectedDSs.add(304);
+ connectedDSs.add(303);
+ connectedDSs.add(302);
+ connectedDSs.add(301);
+
+ rsInfos.put(14, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 14; // current RS id
+ params[2] = 302; // local DS id
+ params[3] = null; // winner url
+ testData[15] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.3 - CL=0.2 - DS=4 ; RS2: TL=0.4 - CL=0.4 - DS=8 ;
+ * RS3: TL=0.1 - CL=0.1 - DS=2 ; RS4: TL=0.2 - CL=0.3 - DS=6
+ * Excepted winner: RS1 : 2 DSs should go away from RS4 but server id 303
+ * is not one of the two lowest ids connected to RS4
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 3);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ connectedDSs.add(3);
+ connectedDSs.add(4);
+
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 4);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(101);
+ connectedDSs.add(102);
+ connectedDSs.add(103);
+ connectedDSs.add(104);
+ connectedDSs.add(105);
+ connectedDSs.add(106);
+ connectedDSs.add(107);
+ connectedDSs.add(108);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(201);
+ connectedDSs.add(202);
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(14, "QwinnerHost:1011", 0L, (byte)1, 2);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(306);
+ connectedDSs.add(305);
+ connectedDSs.add(304);
+ connectedDSs.add(303);
+ connectedDSs.add(302);
+ connectedDSs.add(301);
+
+ rsInfos.put(14, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 14; // current RS id
+ params[2] = 303; // local DS id
+ params[3] = "QwinnerHost:1011"; // winner url
+ testData[16] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.3 - CL=0.2 - DS=4 ; RS2: TL=0.4 - CL=0.65 - DS=13 ;
+ * RS3: TL=0.1 - CL=0.1 - DS=2 ; RS4: TL=0.2 - CL=0.05 - DS=1
+ * Excepted winner: RS2: no change load ok on current server and there is the
+ * possibility to arrange load for other servers with disconnection from
+ * 2 DSs from RS4 and reconnect them to RS1 (we moved these 2 servers from
+ * previous test where the loads were ok)
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 3);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ connectedDSs.add(3);
+ connectedDSs.add(4);
+
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 4);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(113);
+ connectedDSs.add(112);
+ connectedDSs.add(111);
+ connectedDSs.add(110);
+ connectedDSs.add(109);
+ connectedDSs.add(108);
+ connectedDSs.add(107);
+ connectedDSs.add(106);
+ connectedDSs.add(105);
+ connectedDSs.add(104);
+ connectedDSs.add(103);
+ connectedDSs.add(102);
+ connectedDSs.add(101);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(201);
+ connectedDSs.add(202);
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(301);
+
+ rsInfos.put(14, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 12; // current RS id
+ params[2] = 105; // local DS id
+ params[3] = null; // winner url
+ testData[17] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.5 - CL=2/3 - DS=2 ; RS2: TL=0.5 - CL=1/3 - DS=1
+ * Excepted winner: RS1. Local server should stay connected to current one
+ * as the balance cannot be done. We already have the nearest possible
+ * balance to the load goals: disconnection would cause a yoyo effect and
+ * the local server would not stop going and coming back to/from the other
+ * RS.
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "RwinnerHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(3);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 11; // current RS id
+ params[2] = 1; // local DS id
+ params[3] = "RwinnerHost:123"; // winner url
+ testData[18] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=0.5 - CL=2/3 - DS=2 ; RS2: TL=0.5 - CL=1/3 - DS=1
+ * Excepted winner: RS1. Local server should stay connected to current one
+ * as the balance cannot be done. We already have the nearest possible
+ * balance to the load goals: disconnection would cause a yoyo effect and
+ * the local server would not stop going and coming back to/from the other
+ * RS.
+ * Note: Same test as before, but not with the lowest local DS server id
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "SwinnerHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(3);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 11; // current RS id
+ params[2] = 2; // local DS id
+ params[3] = "SwinnerHost:123"; // winner url
+ testData[19] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=1/3 - CL=2/4 - DS=2 ; RS2: TL=1/3 - CL=1/4 - DS=1 ; RS3: TL=1/3 - CL=1/4 - DS=1
+ * Excepted winner: RS1. Local server should stay connected to current one
+ * as the balance cannot be done. We already have the nearest possible
+ * balance to the load goals: disconnection would cause a yoyo effect and
+ * the local server would not stop going and coming back between RSs.
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "TwinnerHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(3);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(4);
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 11; // current RS id
+ params[2] = 1; // local DS id
+ params[3] = "TwinnerHost:123"; // winner url
+ testData[20] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=1/3 - CL=3/7 - DS=3 ; RS2: TL=1/3 - CL=2/7 - DS=2 ; RS3: TL=1/3 - CL=2/7 - DS=2
+ * Excepted winner: RS1. Local server should stay connected to current one
+ * as the balance cannot be done. We already have the nearest possible
+ * balance to the load goals: disconnection would cause a yoyo effect and
+ * the local server would not stop going and coming back between RSs.
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "UwinnerHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ connectedDSs.add(3);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(4);
+ connectedDSs.add(5);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(6);
+ connectedDSs.add(7);
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 11; // current RS id
+ params[2] = 1; // local DS id
+ params[3] = "UwinnerHost:123"; // winner url
+ testData[21] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=1/3 - CL=2/3 - DS=2 ; RS2: TL=1/3 - CL=1/3 - DS=1 ; RS3: TL=1/3 - CL=0 - DS=0
+ * Excepted winner: RS3. Local server should disconnect for reconnection to
+ * RS3
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "looserHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(3);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "VwinnerHost:789", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 11; // current RS id
+ params[2] = 1; // local DS id
+ params[3] = null; // winner url
+ testData[22] = params;
+
+ /**
+ * TL = target load
+ * CL = current load
+ * DS = connected DSs number
+ * RS1: TL=1/3 - CL=2/3 - DS=2 ; RS2: TL=1/3 - CL=1/3 - DS=1 ; RS3: TL=1/3 - CL=0 - DS=0
+ * Excepted winner: RS3. Local server (2) should stay connected while
+ * DS server id 1 should disconnect for reconnection to RS3
+ */
+
+ rsInfos = new HashMap<Integer, ReplicationServerInfo>();
+
+ rsInfo = new RSInfo(11, "WwinnerHost:123", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(1);
+ connectedDSs.add(2);
+ rsInfos.put(11, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(12, "looserHost:456", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ connectedDSs.add(3);
+ rsInfos.put(12, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ rsInfo = new RSInfo(13, "looserHost:789", 0L, (byte)1, 1);
+ connectedDSs = new ArrayList<Integer>();
+ rsInfos.put(13, new ReplicationServerInfo(rsInfo, connectedDSs));
+
+ params = new Object[4];
+ params[0] = rsInfos;
+ params[1] = 11; // current RS id
+ params[2] = 2; // local DS id
+ params[3] = "WwinnerHost:123"; // winner url
+ testData[23] = params;
+
+ return testData;
+ }
+
+ /**
+ * Test the method that chooses the best RS using the RS weights
+ */
+ @Test(dataProvider = "testComputeBestServerForWeightProvider")
+ public void testComputeBestServerForWeight(
+ Map<Integer, ReplicationServerInfo> servers, int currentRsServerId,
+ int localServerId, String winnerUrl)
+ throws Exception
+ {
+ String testCase = "testComputeBestServerForWeight";
+
+ debugInfo("Starting " + testCase);
+
+ ReplicationServerInfo bestServer =
+ computeBestServerForWeight(servers, currentRsServerId, localServerId);
+
+ if (winnerUrl == null)
+ {
+ // We expect null
+ String url = null;
+ if (bestServer != null)
+ {
+ url = bestServer.getServerURL();
+ }
+ assertNull(bestServer, "The best server should be null but is: " + url);
+ } else
+ {
+ assertNotNull(bestServer, "The best server should not be null");
+ assertEquals(bestServer.getServerURL(),
+ winnerUrl, "Wrong best replication server: " + bestServer.getServerURL());
+ }
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
new file mode 100644
index 0000000..e21fa70
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
@@ -0,0 +1,1318 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2009-2010 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import org.opends.server.types.DirectoryException;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.ReplicationServerDomain;
+import org.opends.server.types.DN;
+import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+import static org.opends.server.TestCaseUtils.*;
+
+/**
+ * Test in real situations the algorithm for load balancing the DSs connections
+ * to the RSs. This uses the weights of the RSs. We concentrate the tests on
+ * weight only: all servers have the same group id, gen id an states.
+ */
+public class ReplicationServerLoadBalancingTest extends ReplicationTestCase
+{
+ // Number of DSs
+ private static final int NDS = 20;
+ // Number of RSs
+ private static final int NRS = 4;
+ private LDAPReplicationDomain rd[] = new LDAPReplicationDomain[NDS];
+ private ReplicationServer rs[] = new ReplicationServer[NRS];
+ private int[] rsPort = new int[NRS];
+
+ private static final int RS1_ID = 501;
+ private static final int RS2_ID = 502;
+ private static final int RS3_ID = 503;
+ private static final int RS4_ID = 504;
+
+ // The tracer object for the debug logger
+ private static final DebugTracer TRACER = getTracer();
+
+ private void debugInfo(String s)
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("** TEST **" + s);
+ }
+ }
+
+ private void initTest()
+ {
+ for (int i = 0 ; i < NDS; i++)
+ {
+ rd[i] = null;
+ }
+ for (int i = 0 ; i < NRS; i++)
+ {
+ rs[i] = null;
+ rsPort[i] = -1;
+ }
+ findFreePorts();
+ }
+
+ /**
+ * Find needed free TCP ports.
+ */
+ private void findFreePorts()
+ {
+ try
+ {
+ ServerSocket[] ss = new ServerSocket[NRS];
+
+ for (int i = 0; i < NRS; i++)
+ {
+ ss[i] = TestCaseUtils.bindFreePort();
+ rsPort[i] = ss[i].getLocalPort();
+ }
+ for (int i = 0; i < NRS; i++)
+ {
+ ss[i].close();
+ }
+ } catch (IOException e)
+ {
+ fail("Unable to determinate some free ports " +
+ stackTraceToSingleLineString(e));
+ }
+ }
+
+ private void endTest()
+ {
+ for (int i = 0 ; i < NDS; i++)
+ {
+ if (rd[i] != null)
+ {
+ rd[i].shutdown();
+ rd[i] = null;
+ }
+ }
+
+ try
+ {
+ // Clear any reference to a domain in synchro plugin
+ MultimasterReplication.deleteDomain(DN.decode(TEST_ROOT_DN_STRING));
+ } catch (DirectoryException ex)
+ {
+ fail("Error deleting reference to domain: " + TEST_ROOT_DN_STRING);
+ }
+
+ for (int i = 0; i < NRS; i++)
+ {
+ if (rs[i] != null)
+ {
+ rs[i].clearDb();
+ rs[i].remove();
+ rs[i] = null;
+ }
+ rsPort[i] = -1;
+ }
+ }
+
+ /**
+ * Creates the list of servers to represent the RS topology matching the
+ * passed test case.
+ */
+ private SortedSet<String> createRSListForTestCase(String testCase)
+ {
+ SortedSet<String> replServers = new TreeSet<String>();
+
+ if (testCase.equals("testFailoversAndWeightChanges"))
+ {
+ // 4 servers used for this test case.
+ for (int i = 0; i < NRS; i++)
+ {
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else if (testCase.equals("testSpreadLoad"))
+ {
+ // 4 servers used for this test case.
+ for (int i = 0; i < NRS; i++)
+ {
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else if (testCase.equals("testNoYoyo1"))
+ {
+ // 2 servers used for this test case.
+ for (int i = 0; i < 2; i++)
+ {
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else if (testCase.equals("testNoYoyo2"))
+ {
+ // 3 servers used for this test case.
+ for (int i = 0; i < 3; i++)
+ {
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else if (testCase.equals("testNoYoyo3"))
+ {
+ // 3 servers used for this test case.
+ for (int i = 0; i < 3; i++)
+ {
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else
+
+ fail("Unknown test case: " + testCase);
+
+ return replServers;
+ }
+
+ /**
+ * Creates a new ReplicationServer.
+ */
+ private ReplicationServer createReplicationServer(int rsIndex,
+ int weight, String testCase)
+ {
+ SortedSet<String> replServers = new TreeSet<String>();
+ try
+ {
+ if (testCase.equals("testFailoversAndWeightChanges"))
+ {
+ // 4 servers used for this test case.
+ for (int i = 0; i < NRS; i++)
+ {
+ if (i != rsIndex)
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else if (testCase.equals("testSpreadLoad"))
+ {
+ // 4 servers used for this test case.
+ for (int i = 0; i < NRS; i++)
+ {
+ if (i != rsIndex)
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else if (testCase.equals("testNoYoyo1"))
+ {
+ // 2 servers used for this test case.
+ for (int i = 0; i < 2; i++)
+ {
+ if (i != rsIndex)
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else if (testCase.equals("testNoYoyo2"))
+ {
+ // 3 servers used for this test case.
+ for (int i = 0; i < 3; i++)
+ {
+ if (i != rsIndex)
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else if (testCase.equals("testNoYoyo3"))
+ {
+ // 3 servers used for this test case.
+ for (int i = 0; i < 3; i++)
+ {
+ if (i != rsIndex)
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else
+ fail("Unknown test case: " + testCase);
+
+ String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(rsPort[rsIndex], dir, 0, rsIndex+501, 0, 100,
+ replServers, 1, 1000, 5000, weight);
+ ReplicationServer replicationServer = new ReplicationServer(conf);
+ return replicationServer;
+
+ } catch (Exception e)
+ {
+ fail("createReplicationServer " + stackTraceToSingleLineString(e));
+ }
+ return null;
+ }
+
+ /**
+ * Returns a suitable RS configuration with the passed new weight
+ */
+ private ReplicationServerCfg createReplicationServerConfigWithNewWeight
+ (int rsIndex, int weight, String testCase)
+ {
+ SortedSet<String> replServers = new TreeSet<String>();
+ try
+ {
+ if (testCase.equals("testFailoversAndWeightChanges"))
+ {
+ // 4 servers used for this test case.
+ for (int i = 0; i < NRS; i++)
+ {
+ if (i != rsIndex)
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else if (testCase.equals("testSpreadLoad"))
+ {
+ // 4 servers used for this test case.
+ for (int i = 0; i < NRS; i++)
+ {
+ if (i != rsIndex)
+ replServers.add("localhost:" + rsPort[i]);
+ }
+ } else
+ fail("Unknown test case: " + testCase);
+
+ String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(rsPort[rsIndex], dir, 0, rsIndex+501, 0, 100,
+ replServers, 1, 1000, 5000, weight);
+ return conf;
+
+ } catch (Exception e)
+ {
+ fail("createReplicationServerConfigWithNewWeight " + stackTraceToSingleLineString(e));
+ }
+ return null;
+ }
+
+ /**
+ * Creates a new ReplicationDomain.
+ */
+ private LDAPReplicationDomain createReplicationDomain(int serverId,
+ String testCase)
+ {
+
+ SortedSet<String> replServers = null;
+ try
+ {
+ replServers = createRSListForTestCase(testCase);
+ DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
+ DomainFakeCfg domainConf =
+ new DomainFakeCfg(baseDn, serverId+1, replServers, 1);
+ LDAPReplicationDomain replicationDomain =
+ MultimasterReplication.createNewDomain(domainConf);
+ replicationDomain.start();
+ return replicationDomain;
+
+ } catch (Exception e)
+ {
+ fail("createReplicationDomain " + stackTraceToSingleLineString(e));
+ }
+ return null;
+ }
+
+ /**
+ * Basic weight test: starts some RSs with different weights, start some DSs
+ * and check the DSs are correctly spread across the RSs
+ * @throws Exception If a problem occurred
+ */
+ @Test
+ public void testSpreadLoad() throws Exception
+ {
+ String testCase = "testSpreadLoad";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ try
+ {
+
+ /**
+ * Start RS1 weigth=1, RS2 weigth=2, RS3 weigth=3, RS4 weigth=4
+ */
+ // Create and start RS1
+ rs[0] = createReplicationServer(0, 1, testCase);
+ // Create and start RS2
+ rs[1] = createReplicationServer(1, 2, testCase);
+ // Create and start RS3
+ rs[2] = createReplicationServer(2, 3, testCase);
+ // Create and start RS4
+ rs[3] = createReplicationServer(3, 4, testCase);
+
+ // Start a first DS to make every RSs inter connect
+ rd[0] = createReplicationDomain(0, testCase);
+ assertTrue(rd[0].isConnected());
+
+ // Wait for RSs inter-connections
+ checkRSConnectionsAndGenId(new int[] {0, 1, 2, 3},
+ "Waiting for RSs inter-connections");
+
+ /**
+ * Start the 19 other DSs. One should end up with:
+ * - RS1 has 2 DSs
+ * - RS2 has 4 DSs
+ * - RS3 has 6 DSs
+ * - RS4 has 8 DSs
+ */
+ for (int i = 1; i < NDS; i++)
+ {
+ rd[i] = createReplicationDomain(i, testCase);
+ assertTrue(rd[i].isConnected());
+ }
+
+ // Now check the number of connected DSs for each RS
+ assertEquals(getDSConnectedToRS(0), 2,
+ "Wrong expected number of DSs connected to RS1");
+ assertEquals(getDSConnectedToRS(1), 4,
+ "Wrong expected number of DSs connected to RS2");
+ assertEquals(getDSConnectedToRS(2), 6,
+ "Wrong expected number of DSs connected to RS3");
+ assertEquals(getDSConnectedToRS(3), 8,
+ "Wrong expected number of DSs connected to RS4");
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ /**
+ * Return the number of DSs currently connected to the RS with the passed
+ * index
+ */
+ private int getDSConnectedToRS(int rsIndex)
+ {
+ Iterator<ReplicationServerDomain> rsdIt = rs[rsIndex].getDomainIterator();
+ if (rsdIt == null) // No domain yet so no connections yet
+ return 0;
+ return rsdIt.next().getConnectedDSs().keySet().
+ size();
+ }
+
+ /**
+ * Waits for secTimeout seconds (before failing) that all RSs are connected
+ * together and that they have the same generation id.
+ * @param rsIndexes List of the indexes of the RSs that should all be
+ * connected together at the end
+ * @param msg The message to display if the condition is not met before
+ * timeout
+ */
+ private void checkRSConnectionsAndGenId(int[] rsIndexes, String msg)
+ {
+ debugInfo("checkRSConnectionsAndGenId for <" + msg + ">");
+ // Number of seconds to wait for condition before failing
+ int secTimeout = 30;
+ // Number of seconds already passed
+ int nSec = 0;
+ // Number of RSs to take into account
+ int nRSs = rsIndexes.length;
+
+ // Go out of the loop only if connection is verified or if timeout occurs
+ while (true)
+ {
+ // Test connection
+ boolean connected = false;
+ boolean sameGenId = false;
+ Iterator<ReplicationServerDomain> rsdIt = null;
+
+ // Connected together ?
+ int nOk = 0;
+ for (int i = 0; i < nRSs; i++)
+ {
+ int rsIndex = rsIndexes[i];
+ ReplicationServer repServer = rs[rsIndex];
+ rsdIt = repServer.getDomainIterator();
+ int curRsId = repServer.getServerId();
+ Set<Integer> connectedRSsId = null;
+ if (rsdIt != null)
+ {
+ connectedRSsId = rsdIt.next().getConnectedRSs().keySet();
+ } else
+ {
+ // No domain yet, RS is not yet connected to others
+ debugInfo("RS " + curRsId + " has no domain yet");
+ break;
+ }
+ // Does this RS see all other RSs
+ int nPeer = 0;
+ debugInfo("Checking RSs connected to RS " + curRsId);
+ for (int j = 0; j < nRSs; j++)
+ {
+ int otherRsIndex = rsIndexes[j];
+ if (otherRsIndex != rsIndex) // Treat only other RSs
+ {
+ int otherRsId = otherRsIndex+501;
+ if (connectedRSsId.contains(otherRsId))
+ {
+ debugInfo("\tRS " + curRsId + " sees RS " + otherRsId);
+ nPeer++;
+ } else
+ {
+ debugInfo("\tRS " + curRsId + " does not see RS " + otherRsId);
+ }
+ }
+ }
+ if (nPeer == nRSs-1)
+ nOk++;
+ }
+
+ if (nOk == nRSs)
+ {
+ debugInfo("Connections are ok");
+ connected = true;
+ } else
+ {
+ debugInfo("Connections are not ok");
+ }
+
+ // Same gen id ?
+ long refGenId = -1L;
+ boolean refGenIdInitialized = false;
+ nOk = 0;
+ rsdIt = null;
+ for (int i = 0; i < nRSs; i++)
+ {
+ ReplicationServer repServer = rs[i];
+ rsdIt = repServer.getDomainIterator();
+ int curRsId = repServer.getServerId();
+ Long rsGenId = -1L;
+ if (rsdIt != null)
+ {
+ rsGenId = rsdIt.next().getGenerationId();
+ } else
+ {
+ // No domain yet, RS is not yet connected to others
+ debugInfo("RS " + curRsId + " has no domain yet");
+ break;
+ }
+
+ // Expecting all RSs to have gen id equal and not -1
+ if ((rsGenId == -1L))
+ {
+ debugInfo("\tRS " + curRsId + " gen id is -1 which is not expected");
+ break;
+ } else
+ {
+ if (!refGenIdInitialized)
+ {
+ // Store reference gen id all RSs must have
+ refGenId = rsGenId;
+ refGenIdInitialized = true;
+ }
+ }
+ if (rsGenId == refGenId)
+ {
+ debugInfo("\tRS " + curRsId + " gen id is " + rsGenId + " as expected");
+ nOk++;
+ } else
+ {
+ debugInfo("\tRS " + curRsId + " gen id is " + rsGenId
+ + " but expected " + refGenId);
+ }
+ }
+
+ if (nOk == nRSs)
+ {
+ debugInfo("Gen ids are ok");
+ sameGenId = true;
+ } else
+ {
+ debugInfo("Gen ids are not ok");
+ }
+
+ if (connected && sameGenId)
+ {
+ // Connection verified
+ debugInfo("checkRSConnections: all RSs connected and with same gen id obtained after "
+ + nSec + " seconds.");
+ return;
+ }
+
+ // Sleep 1 second
+ try
+ {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex)
+ {
+ fail("Error sleeping " + stackTraceToSingleLineString(ex));
+ }
+ nSec++;
+
+ if (nSec > secTimeout)
+ {
+ // Timeout reached, end with error
+ fail("checkRSConnections: could not obtain that RSs are connected and have the same gen id after "
+ + (nSec-1) + " seconds. [" + msg + "]");
+ }
+ }
+ }
+
+ /**
+ * Execute a full scenario with some RSs failovers and dynamic weight changes.
+ * @throws Exception If a problem occurred
+ */
+ @Test (groups = "slow")
+ public void testFailoversAndWeightChanges() throws Exception
+ {
+ String testCase = "testFailoversAndWeightChanges";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ try
+ {
+
+ /**
+ * RS1 (weight=1) starts
+ */
+
+ rs[0] = createReplicationServer(0, 1, testCase);
+
+ /**
+ * DS1 starts and connects to RS1
+ */
+
+ rd[0] = createReplicationDomain(0, testCase);
+ assertTrue(rd[0].isConnected());
+ assertEquals(rd[0].getRsServerId(), RS1_ID);
+
+ /**
+ * RS2 (weight=1) starts
+ */
+
+ rs[1] = createReplicationServer(1, 1, testCase);
+ checkRSConnectionsAndGenId(new int[] {0, 1},
+ "Waiting for RS2 connected to peers");
+
+ /**
+ * DS2 starts and connects to RS2
+ */
+
+ rd[1] = createReplicationDomain(1, testCase);
+ assertTrue(rd[1].isConnected());
+ assertEquals(rd[1].getRsServerId(), RS2_ID);
+
+ /**
+ * RS3 (weight=1) starts
+ */
+
+ rs[2] = createReplicationServer(2, 1, testCase);
+ checkRSConnectionsAndGenId(new int[] {0, 1, 2},
+ "Waiting for RS3 connected to peers");
+
+ /**
+ * DS3 starts and connects to RS3
+ */
+
+ rd[2] = createReplicationDomain(2, testCase);
+ assertTrue(rd[2].isConnected());
+ assertEquals(rd[2].getRsServerId(), RS3_ID);
+
+ /**
+ * DS4 starts and connects to RS1, RS2 or RS3
+ */
+
+ rd[3] = createReplicationDomain(3, testCase);
+ assertTrue(rd[3].isConnected());
+ int ds4ConnectedRsId = rd[3].getRsServerId();
+ assertTrue((ds4ConnectedRsId == RS1_ID) || (ds4ConnectedRsId == RS2_ID) ||
+ (ds4ConnectedRsId == RS3_ID),
+ "DS4 should be connected to either RS1, RS2 or RS3 but is it is " +
+ "connected to RS id " + ds4ConnectedRsId);
+
+ /**
+ * DS5 starts and connects to one of the 2 other RSs
+ */
+
+ rd[4] = createReplicationDomain(4, testCase);
+ assertTrue(rd[4].isConnected());
+ int ds5ConnectedRsId = rd[4].getRsServerId();
+ assertTrue((ds5ConnectedRsId != ds4ConnectedRsId),
+ "DS5 should be connected to a RS which is not the same as the one of " +
+ "DS4 (" + ds4ConnectedRsId + ")");
+
+ /**
+ * DS6 starts and connects to the RS with one DS
+ */
+
+ rd[5] = createReplicationDomain(5, testCase);
+ assertTrue(rd[5].isConnected());
+ int ds6ConnectedRsId = rd[5].getRsServerId();
+ assertTrue((ds6ConnectedRsId != ds4ConnectedRsId) &&
+ (ds6ConnectedRsId != ds5ConnectedRsId),
+ "DS6 should be connected to a RS which is not the same as the one of " +
+ "DS4 (" + ds4ConnectedRsId + ") or DS5 (" + ds5ConnectedRsId + ") : " +
+ ds6ConnectedRsId);
+
+ /**
+ * DS7 to DS12 start, we must end up with RS1, RS2 and RS3 each with 4 DSs
+ */
+ for (int i = 6; i < 12; i++)
+ {
+ rd[i] = createReplicationDomain(i, testCase);
+ assertTrue(rd[i].isConnected());
+ }
+ // Now check the number of connected DSs for each RS
+ assertEquals(getDSConnectedToRS(0), 4,
+ "Wrong expected number of DSs connected to RS1");
+ assertEquals(getDSConnectedToRS(1), 4,
+ "Wrong expected number of DSs connected to RS2");
+ assertEquals(getDSConnectedToRS(2), 4,
+ "Wrong expected number of DSs connected to RS3");
+
+ /**
+ * RS4 (weight=1) starts, we must end up with RS1, RS2, RS3 and RS4 each
+ * with 3 DSs
+ */
+
+ rs[3] = createReplicationServer(3, 1, testCase);
+ checkRSConnectionsAndGenId(new int[] {0, 1, 2, 3},
+ "Waiting for RS4 connected to peers");
+
+ checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {3, 3, 3, 3}},
+ "RS4 started, each RS should have 3 DSs connected to it");
+
+ /**
+ * Change RS3 weight from 1 to 3, we must end up with RS1, RS2 and RS4
+ * each with 2 DSs and RS3 with 6 DSs
+ */
+
+ // Change RS3 weight to 3
+ ReplicationServerCfg newRSConfig =
+ createReplicationServerConfigWithNewWeight(2, 3, testCase);
+ rs[2].applyConfigurationChange(newRSConfig);
+
+ checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {2, 2, 6, 2}},
+ "RS3 changed weight from 1 to 3");
+
+ /**
+ * DS13 to DS20 start, we must end up with RS1, RS2 and RS4 each with 3
+ * or 4 DSs (1 with 4 and the 2 others with 3) and RS3 with 10 DSs
+ */
+
+ for (int i = 12; i < 20; i++)
+ {
+ rd[i] = createReplicationDomain(i, testCase);
+ assertTrue(rd[i].isConnected());
+ }
+ int rsWith4DsIndex = -1; // The RS (index) that has 4 DSs
+ // Now check the number of connected DSs for each RS
+ int rs1ConnectedDSNumber = getDSConnectedToRS(0);
+ assertTrue(((rs1ConnectedDSNumber == 3) || (rs1ConnectedDSNumber == 4)),
+ "Wrong expected number of DSs connected to RS1: " +
+ rs1ConnectedDSNumber);
+ if (rs1ConnectedDSNumber == 4)
+ rsWith4DsIndex = 0;
+ int rs2ConnectedDSNumber = getDSConnectedToRS(1);
+ assertTrue(((rs2ConnectedDSNumber == 3) || (rs2ConnectedDSNumber == 4)),
+ "Wrong expected number of DSs connected to RS2: " +
+ rs2ConnectedDSNumber);
+ if (rs2ConnectedDSNumber == 4)
+ rsWith4DsIndex = 1;
+ int rs4ConnectedDSNumber = getDSConnectedToRS(3);
+ assertTrue(((rs4ConnectedDSNumber == 3) || (rs4ConnectedDSNumber == 4)),
+ "Wrong expected number of DSs connected to RS4: " +
+ rs4ConnectedDSNumber);
+ if (rs4ConnectedDSNumber == 4)
+ rsWith4DsIndex = 3;
+ int sumOfRs1Rs2Rs4 = rs1ConnectedDSNumber + rs2ConnectedDSNumber +
+ rs4ConnectedDSNumber;
+ assertEquals(sumOfRs1Rs2Rs4, 10, "Expected 10 DSs connected to RS1, RS2" +
+ " and RS4");
+ assertEquals(getDSConnectedToRS(2), 10,
+ "Wrong expected number of DSs connected to RS3");
+
+ /**
+ * Stop 2 DSs from RS3, one should end up with RS1 has 3 DSs, RS2 has 3
+ * DSs, RS3 has 9 DSs and RS4 has 3 DSs (with DS (with the lowest server
+ * id) from the RS that had 4 DSs that went to RS3)
+ */
+
+ // Determine the lowest id of DSs connected to the RS with 4 DSs
+ Set<Integer> fourDsList = rs[rsWith4DsIndex].getDomainIterator().next().
+ getConnectedDSs().keySet();
+ assertEquals(fourDsList.size(), 4);
+ int lowestDsId = Integer.MAX_VALUE;
+ for (int id : fourDsList)
+ {
+ if (id < lowestDsId)
+ lowestDsId = id;
+ }
+
+ // Get 2 DS ids of 2 DSs connected to RS3 and stop matching DSs
+ Iterator<Integer> dsIdIt = rs[2].getDomainIterator().next().
+ getConnectedDSs().keySet().iterator();
+ int aFirstDsOnRs3Id = dsIdIt.next() - 1;
+ rd[aFirstDsOnRs3Id].shutdown();
+ int aSecondDsOnRs3Id = dsIdIt.next() - 1;
+ rd[aSecondDsOnRs3Id].shutdown();
+
+ // Check connections
+ checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {3, 3, 9, 3}},
+ "2 DSs ("+ aFirstDsOnRs3Id + "," + aSecondDsOnRs3Id +
+ ") have been stopped from RS3, DS with lowest id (" + lowestDsId +
+ ") should have moved from the RS with 4 DS (RS " +
+ (rsWith4DsIndex+501) + ") to RS3");
+
+ // Check that the right DS moved away from the RS with 4 DSs and went to
+ // RS3 and that the 3 others did not move
+ Set<Integer> dsOnRs3List = rs[2].getDomainIterator().next().
+ getConnectedDSs().keySet();
+ assertTrue(dsOnRs3List.contains(lowestDsId), "DS with the lowest id (" +
+ lowestDsId + " should have come to RS3");
+ Set<Integer> threeDsList = rs[rsWith4DsIndex].getDomainIterator().next().
+ getConnectedDSs().keySet();
+ assertEquals(threeDsList.size(), 3);
+ for (int id : threeDsList)
+ {
+ assertTrue(fourDsList.contains(id), "DS " + id + " should still be on "
+ + "RS " + (rsWith4DsIndex+501));
+ }
+
+ /**
+ * Start the 2 stopped DSs again, we must end up with RS1, RS2 and RS4
+ * each with 3 or 4 DSs (1 with 4 and the 2 others with 3) and RS3 with
+ * 10 DSs
+ */
+
+ // Restart the 2 stopped DSs
+ rd[aFirstDsOnRs3Id] = createReplicationDomain(aFirstDsOnRs3Id, testCase);
+ assertTrue(rd[aFirstDsOnRs3Id].isConnected());
+ rd[aSecondDsOnRs3Id] = createReplicationDomain(aSecondDsOnRs3Id, testCase);
+ assertTrue(rd[aSecondDsOnRs3Id].isConnected());
+ // Now check the number of connected DSs for each RS
+ rs1ConnectedDSNumber = getDSConnectedToRS(0);
+ assertTrue(((rs1ConnectedDSNumber == 3) || (rs1ConnectedDSNumber == 4)),
+ "Wrong expected number of DSs connected to RS1: " +
+ rs1ConnectedDSNumber);
+ rs2ConnectedDSNumber = getDSConnectedToRS(1);
+ assertTrue(((rs2ConnectedDSNumber == 3) || (rs2ConnectedDSNumber == 4)),
+ "Wrong expected number of DSs connected to RS2: " +
+ rs2ConnectedDSNumber);
+ rs4ConnectedDSNumber = getDSConnectedToRS(3);
+ assertTrue(((rs4ConnectedDSNumber == 3) || (rs4ConnectedDSNumber == 4)),
+ "Wrong expected number of DSs connected to RS4: " +
+ rs4ConnectedDSNumber);
+ sumOfRs1Rs2Rs4 = rs1ConnectedDSNumber + rs2ConnectedDSNumber +
+ rs4ConnectedDSNumber;
+ assertEquals(sumOfRs1Rs2Rs4, 10, "Expected 10 DSs connected to RS1, RS2" +
+ " and RS4");
+ assertEquals(getDSConnectedToRS(2), 10,
+ "Wrong expected number of DSs connected to RS3");
+
+ /**
+ * Change RS2 weight to 2, RS3 weight to 4, RS4 weight to 3, we must end
+ * up with RS1 has 2 DSs, RS2 has 4 DSs, RS3 has 8 DSs and RS4 has 6 DSs
+ */
+
+ // Change RS2 weight to 2
+ newRSConfig = createReplicationServerConfigWithNewWeight(1, 2, testCase);
+ rs[1].applyConfigurationChange(newRSConfig);
+ // Change RS3 weight to 4
+ newRSConfig = createReplicationServerConfigWithNewWeight(2, 4, testCase);
+ rs[2].applyConfigurationChange(newRSConfig);
+ // Change RS4 weight to 3
+ newRSConfig = createReplicationServerConfigWithNewWeight(3, 3, testCase);
+ rs[3].applyConfigurationChange(newRSConfig);
+
+ checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {2, 4, 8, 6}},
+ "Changed RS2, RS3 and RS4 weights");
+
+ /**
+ * Stop RS2 and RS4, we must end up with RS1 has 4 DSs, and RS3 has 16 DSs
+ */
+
+ // Stop RS2
+ rs[1].clearDb();
+ rs[1].remove();
+ // Stop RS4
+ rs[3].clearDb();
+ rs[3].remove();
+
+ checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {4, -1, 16, -1}},
+ "Stopped RS2 and RS4");
+
+ /**
+ * Restart RS2 and RS4 with same weights (2 and 3), we must end up with
+ * RS1 has 2 DSs, RS2 has 4 DSs, RS3 has 8 DSs and RS4 has 6 DSs
+ */
+
+ // Restart RS2
+ rs[1] = createReplicationServer(1, 2, testCase);
+ // Restart RS4
+ rs[3] = createReplicationServer(3, 3, testCase);
+
+ checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {2, 4, 8, 6}},
+ "Restarted RS2 and RS4");
+
+ /**
+ * Stop RS3, we must end up with RS1 has 3 DSs, and RS2 has 7 DSs and
+ * RS4 has 10 DSs
+ */
+
+ // Stop RS3
+ rs[2].clearDb();
+ rs[2].remove();
+
+ checkForCorrectNumbersOfConnectedDSs(new int[][]{
+ new int[] {2, 8, -1, 10},
+ new int[] {3, 7, -1, 10},
+ new int[] {3, 8, -1, 9},
+ new int[] {4, 6, -1, 10},
+ new int[] {4, 7, -1, 9},
+ new int[] {5, 6, -1, 9}},
+ "Stopped RS3");
+
+ /**
+ * Restart RS3 with same weight (4), we must end up with RS1 has 2 DSs,
+ * RS2 has 4 DSs, RS3 has 8 DSs and RS4 has 6 DSs
+ */
+
+ // Restart RS3
+ rs[2] = createReplicationServer(2, 4, testCase);
+
+ checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {2, 4, 8, 6}},
+ "Restarted RS2 and RS4");
+
+ /**
+ * Stop RS1, RS2 and RS3, all DSs should be connected to RS4
+ */
+
+ // Stop RS1
+ rs[0].clearDb();
+ rs[0].remove();
+ // Stop RS2
+ rs[1].clearDb();
+ rs[1].remove();
+ // Stop RS3
+ rs[2].clearDb();
+ rs[2].remove();
+
+ checkForCorrectNumbersOfConnectedDSs(new int[][]{new int[] {-1, -1, -1, 20}},
+ "Stopped RS1, RS2 and RS3");
+
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ // Translate an int array into a human readable string
+ private static String intArrayToString(int[] ints)
+ {
+ StringBuffer sb = new StringBuffer("[");
+ for (int i = 0; i < ints.length; i++)
+ {
+ if (i != 0)
+ sb.append(",");
+ sb.append(ints[i]);
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ // Translate an int[][] array into a human readable string
+ private static String intArrayToString(int[][] ints)
+ {
+ StringBuffer sb = new StringBuffer("[");
+ for (int i = 0; i < ints.length; i++)
+ {
+ if (i != 0)
+ sb.append(",");
+ sb.append(intArrayToString(ints[i]));
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ /**
+ * Wait for the correct number of connected DSs for each RS. Fails if timeout
+ * before condition met.
+ * @param possibleExpectedDSsNumbers The expected number of connected DSs for each
+ * RS. -1 if the matching RS should not be taken into account. This is a list of
+ * possible expected situation
+ * @param msg The message to display if the condition is not met before
+ * timeout
+ */
+ private void checkForCorrectNumbersOfConnectedDSs(int[][] possibleExpectedDSsNumbers,
+ String msg)
+ {
+ // Time to wait before condition met: warning, this should let enough
+ // time to the topology to auto-balance. Currently this must at least let
+ // enough time to a topo message being received and to monitoring messages
+ // being received after (2 monitoring publisher period)
+ int secTimeout = 30;
+ int nSec = 0;
+ // To display what has been seen
+ int[] finalDSsNumbers = new int[possibleExpectedDSsNumbers[0].length];
+
+ // Go out of the loop only if connection is verified or if timeout occurs
+ while (true)
+ {
+ for (int i = 0; i < possibleExpectedDSsNumbers.length; i++)
+ {
+ // Examine next possible final situation
+ int[] expectedDSsNumbers = possibleExpectedDSsNumbers[i];
+ // Examine connections
+ int nOk = 0; // Number of RSs ok
+ int nRSs = 0; // Number of RSs to examine
+ for (int j = 0; j < finalDSsNumbers.length; j++)
+ {
+ int expectedDSNumber = expectedDSsNumbers[j];
+
+ if (expectedDSNumber != -1)
+ {
+ nRSs++;
+ // Check for number of DSs connected to this RS
+ int connectedDSs = getDSConnectedToRS(j);
+ if (connectedDSs == expectedDSNumber)
+ {
+ nOk++;
+ }
+ // Store result for this RS
+ finalDSsNumbers[j] = connectedDSs;
+ }
+ else
+ {
+ // Store result for this RS
+ finalDSsNumbers[j] = -1;
+ }
+ }
+
+ if (nOk == nRSs)
+ {
+ // Connection verified
+ debugInfo("checkForCorrectNumbersOfConnectedDSs: got expected " +
+ "connections " + intArrayToString(expectedDSsNumbers) + " after " + nSec +
+ " seconds.");
+ return;
+ }
+ }
+
+ // Sleep 1 second
+ try
+ {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex)
+ {
+ fail("Error sleeping " + stackTraceToSingleLineString(ex));
+ }
+ nSec++;
+
+ if (nSec > secTimeout)
+ {
+ // Timeout reached, end with error
+ fail("checkForCorrectNumbersOfConnectedDSs: could not get expected " +
+ "connections " + intArrayToString(possibleExpectedDSsNumbers) + " after " + (nSec-1) +
+ " seconds. Got this result : " + intArrayToString(finalDSsNumbers) +
+ " [" + msg + "]");
+ }
+ }
+ }
+
+ /**
+ * In a topology where the balance cannot be exactly reached according to the
+ * weights, this is testing that the DS is not doing yoyo. The yoyo effect
+ * would be a DS keeping going between RSs (going to/back from other RS for
+ * ever).
+ *
+ * RS1 weight=1 ; RS2 weight=1 ; 3DSs.
+ * We expect two DSs on one RS and the last one on the other RS and no
+ * disconnections/reconnections after the very first connections.
+ * @throws Exception If a problem occurred
+ */
+ @Test (groups = "slow")
+ public void testNoYoyo1() throws Exception
+ {
+ String testCase = "testNoYoyo1";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ try
+ {
+
+ /**
+ * RS1 (weight=1) starts
+ */
+
+ rs[0] = createReplicationServer(0, 1, testCase);
+
+ /**
+ * DS1 starts and connects to RS1
+ */
+
+ rd[0] = createReplicationDomain(0, testCase);
+ assertTrue(rd[0].isConnected());
+ assertEquals(rd[0].getRsServerId(), RS1_ID);
+
+ /**
+ * RS2 (weight=1) starts
+ */
+
+ rs[1] = createReplicationServer(1, 1, testCase);
+ checkRSConnectionsAndGenId(new int[] {0, 1},
+ "Waiting for RS2 connected to peers");
+
+ /**
+ * DS2 starts and connects to RS2
+ */
+
+ rd[1] = createReplicationDomain(1, testCase);
+ assertTrue(rd[1].isConnected());
+ assertEquals(rd[1].getRsServerId(), RS2_ID);
+
+ /**
+ * DS3 starts and connects to either RS1 or RS2 but should stay on it
+ */
+
+ int dsIsIndex = 2;
+ rd[dsIsIndex] = createReplicationDomain(dsIsIndex, testCase);
+ assertTrue(rd[dsIsIndex].isConnected());
+
+ int rsId = rd[dsIsIndex].getRsServerId();
+ int rsIndex = rsId - 501;
+ int nDSs = getDSConnectedToRS(rsIndex);
+ assertEquals(getDSConnectedToRS(rsIndex), 2, " Expected 2 DSs on RS " +
+ rsId);
+ debugInfo(testCase + ": DS3 connected to RS " + rsId + ", with " + nDSs
+ + " DSs");
+
+ // Be sure that DS3 stays connected to the same RS during some long time
+ // check every second
+ int waitTime = 10;
+ int elapsedTime = 0;
+ while (elapsedTime < waitTime)
+ {
+ Thread.sleep(1000);
+ // Still connected to the right RS ?
+ assertEquals(rd[dsIsIndex].getRsServerId(), rsId, "DS3 should still be " +
+ "connected to RS " + rsId);
+ assertEquals(getDSConnectedToRS(rsIndex), 2, " Expected 2 DSs on RS " +
+ rsId);
+ elapsedTime++;
+ }
+
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ /**
+ * In a topology where the balance cannot be exactly reached according to the
+ * weights, this is testing that the DS is not doing yoyo. The yoyo effect
+ * would be a DS keeping going between RSs (going to/back from other RS for
+ * ever).
+ *
+ * RS1 weight=1 ; RS2 weight=1 ; RS3 weight=1 ; 4DSs.
+ * We expect 1 RS with 2 DSs and the 2 other RSs with 1 DS each and no
+ * disconnections/reconnections after the very first connections.
+ * @throws Exception If a problem occurred
+ */
+ @Test (groups = "slow")
+ public void testNoYoyo2() throws Exception
+ {
+ String testCase = "testNoYoyo2";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ try
+ {
+
+ /**
+ * RS1 (weight=1) starts
+ */
+
+ rs[0] = createReplicationServer(0, 1, testCase);
+
+ /**
+ * DS1 starts and connects to RS1
+ */
+
+ rd[0] = createReplicationDomain(0, testCase);
+ assertTrue(rd[0].isConnected());
+ assertEquals(rd[0].getRsServerId(), RS1_ID);
+
+ /**
+ * RS2 (weight=1) and R3 (weight=1) start
+ */
+
+ rs[1] = createReplicationServer(1, 1, testCase);
+ rs[2] = createReplicationServer(2, 1, testCase);
+ checkRSConnectionsAndGenId(new int[] {0, 1, 2},
+ "Waiting for RSs being connected to peers");
+
+ /**
+ * DS2 to DS3 start and connects to RSs
+ */
+
+ for (int i = 1; i < 3; i++)
+ {
+ rd[i] = createReplicationDomain(i, testCase);
+ assertTrue(rd[i].isConnected());
+ }
+
+ /**
+ * DS4 starts and connects to either RS1 RS2 or RS3 but should stay on it
+ */
+
+ int dsIsIndex = 3;
+ rd[dsIsIndex] = createReplicationDomain(dsIsIndex, testCase);
+ assertTrue(rd[dsIsIndex].isConnected());
+
+ int rsId = rd[dsIsIndex].getRsServerId();
+ int rsIndex = rsId - 501;
+ int nDSs = getDSConnectedToRS(rsIndex);
+ assertEquals(getDSConnectedToRS(rsIndex), 2, " Expected 2 DSs on RS " +
+ rsId);
+ debugInfo(testCase + ": DS4 connected to RS " + rsId + ", with " + nDSs
+ + " DSs");
+
+ // Be sure that DS3 stays connected to the same RS during some long time
+ // check every second
+ int waitTime = 10;
+ int elapsedTime = 0;
+ while (elapsedTime < waitTime)
+ {
+ Thread.sleep(1000);
+ // Still connected to the right RS ?
+ assertEquals(rd[dsIsIndex].getRsServerId(), rsId, "DS4 should still be " +
+ "connected to RS " + rsId);
+ assertEquals(getDSConnectedToRS(rsIndex), 2, " Expected 2 DSs on RS " +
+ rsId);
+ elapsedTime++;
+ }
+
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ /**
+ * In a topology where the balance cannot be exactly reached according to the
+ * weights, this is testing that the DS is not doing yoyo. The yoyo effect
+ * would be a DS keeping going between RSs (going to/back from other RS for
+ * ever).
+ *
+ * RS1 weight=1 ; RS2 weight=1 ; RS3 weight=1 ; 7DSs.
+ * We expect 1 RS with 3 DSs and the 2 other RSs with 2 DS each and no
+ * disconnections/reconnections after the very first connections.
+ * @throws Exception If a problem occurred
+ */
+ @Test (groups = "slow")
+ public void testNoYoyo3() throws Exception
+ {
+ String testCase = "testNoYoyo3";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ try
+ {
+
+ /**
+ * RS1 (weight=1) starts
+ */
+
+ rs[0] = createReplicationServer(0, 1, testCase);
+
+ /**
+ * DS1 starts and connects to RS1
+ */
+
+ rd[0] = createReplicationDomain(0, testCase);
+ assertTrue(rd[0].isConnected());
+ assertEquals(rd[0].getRsServerId(), RS1_ID);
+
+ /**
+ * RS2 (weight=1) and R3 (weight=1) start
+ */
+
+ rs[1] = createReplicationServer(1, 1, testCase);
+ rs[2] = createReplicationServer(2, 1, testCase);
+ checkRSConnectionsAndGenId(new int[] {0, 1, 2},
+ "Waiting for RSs being connected to peers");
+
+ /**
+ * DS2 to DS6 start and connects to RSs
+ */
+
+ for (int i = 1; i < 6; i++)
+ {
+ rd[i] = createReplicationDomain(i, testCase);
+ assertTrue(rd[i].isConnected());
+ }
+
+ /**
+ * DS7 starts and connects to either RS1 RS2 or RS3 but should stay on it
+ */
+
+ int dsIsIndex = 6;
+ rd[dsIsIndex] = createReplicationDomain(dsIsIndex, testCase);
+ assertTrue(rd[dsIsIndex].isConnected());
+
+ int rsId = rd[dsIsIndex].getRsServerId();
+ int rsIndex = rsId - 501;
+ int nDSs = getDSConnectedToRS(rsIndex);
+ assertEquals(getDSConnectedToRS(rsIndex), 3, " Expected 2 DSs on RS " +
+ rsId);
+ debugInfo(testCase + ": DS7 connected to RS " + rsId + ", with " + nDSs
+ + " DSs");
+
+ // Be sure that DS3 stays connected to the same RS during some long time
+ // check every second
+ int waitTime = 10;
+ int elapsedTime = 0;
+ while (elapsedTime < waitTime)
+ {
+ Thread.sleep(1000);
+ // Still connected to the right RS ?
+ assertEquals(rd[dsIsIndex].getRsServerId(), rsId, "DS7 should still be " +
+ "connected to RS " + rsId);
+ assertEquals(getDSConnectedToRS(rsIndex), 3, " Expected 2 DSs on RS " +
+ rsId);
+ elapsedTime++;
+ }
+
+ } finally
+ {
+ endTest();
+ }
+ }
+}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index 889418b..b1472db 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -22,10 +22,13 @@
* CDDL HEADER END
*
*
- * Copyright 2008-2009 Sun Microsystems, Inc.
+ * Copyright 2008-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
+import java.net.UnknownHostException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
@@ -37,6 +40,7 @@
import static org.testng.Assert.fail;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashSet;
@@ -821,22 +825,34 @@
private RSInfo createRSInfo(int rsId)
{
int groupId = -1;
+ String serverUrl = null;
+ String localHostname = null;
+ try
+ {
+ localHostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException ex)
+ {
+ fail("Could not get local host name: " + ex.getMessage());
+ }
switch (rsId)
{
case RS1_ID:
groupId = RS1_GID;
+ serverUrl = localHostname + ":" + rs1Port;
break;
case RS2_ID:
groupId = RS2_GID;
+ serverUrl = localHostname + ":" + rs2Port;
break;
case RS3_ID:
groupId = RS3_GID;
+ serverUrl = localHostname + ":" + rs3Port;
break;
default:
fail("Unknown replication server id.");
}
- return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId, 1);
+ return new RSInfo(rsId, serverUrl, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId, 1);
}
/**
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
index b937ea1..17ca67a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2009 Sun Microsystems, Inc.
+ * Copyright 2009-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -1092,11 +1092,11 @@
dsList4.add(dsInfo2);
dsList4.add(dsInfo1);
- RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
+ RSInfo rsInfo1 = new RSInfo(4527, null, (long)45316, (byte)103, 1);
- RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
+ RSInfo rsInfo2 = new RSInfo(4527, null, (long)0, (byte)0, 1);
- RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
+ RSInfo rsInfo3 = new RSInfo(0, null, (long)-21113, (byte)98, 1);
List<RSInfo> rsList1 = new ArrayList<RSInfo>();
rsList1.add(rsInfo1);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index dc67c82..2af2326 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -1026,13 +1026,13 @@
dsList4.add(dsInfo2);
dsList4.add(dsInfo1);
- RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
+ RSInfo rsInfo1 = new RSInfo(4527, "rsHost1:123", (long)45316, (byte)103, 1);
- RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
+ RSInfo rsInfo2 = new RSInfo(4527, "rsHost2:456", (long)0, (byte)0, 1);
- RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
+ RSInfo rsInfo3 = new RSInfo(0, "rsHost3:789", (long)-21113, (byte)98, 1);
- RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98, 1);
+ RSInfo rsInfo4 = new RSInfo(45678, "rsHost4:1011", (long)-21113, (byte)98, 1);
List<RSInfo> rsList1 = new ArrayList<RSInfo>();
rsList1.add(rsInfo1);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index f95550a..cc9bb17 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008-2009 Sun Microsystems, Inc.
+ * Copyright 2008-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -969,7 +969,7 @@
}
// Send our topo mesg
- RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
+ RSInfo rsInfo = new RSInfo(serverId, fakeUrl, generationId, groupId, 1);
List<RSInfo> rsInfos = new ArrayList<RSInfo>();
rsInfos.add(rsInfo);
TopologyMsg topoMsg = new TopologyMsg(null, rsInfos);
--
Gitblit v1.10.0