From 874f6e3a092bdaa5f151c512c9284b15f5886e82 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Tue, 19 Jan 2010 09:53:03 +0000
Subject: [PATCH] This is about refactoring the way the directory server chooses the replication server it will connect to. This also introduces a new (weighed) load balancing feature that spreads DS connections across the RSs, according to the RS weights defined by the administrator,
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 1620 ++++++++++++++++++++++++++++++++++++++---------------------
1 files changed, 1,047 insertions(+), 573 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index bd145d6..baa4310 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -26,12 +26,16 @@
*/
package org.opends.server.replication.service;
+import java.net.UnknownHostException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -40,10 +44,12 @@
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -51,7 +57,6 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
-import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
@@ -102,7 +107,7 @@
private Semaphore sendWindow;
private int maxSendWindow;
private int rcvWindow = 100;
- private int halfRcvWindow = rcvWindow/2;
+ private int halfRcvWindow = rcvWindow / 2;
private int maxRcvWindow = rcvWindow;
private int timeout = 0;
private short protocolVersion;
@@ -117,13 +122,11 @@
private String rsServerUrl = null;
// Our replication domain
private ReplicationDomain domain = null;
-
/**
* This object is used as a conditional event to be notified about
* the reception of monitor information from the Replication Server.
*/
private final MutableBoolean monitorResponse = new MutableBoolean(false);
-
/**
* A Map containing the ServerStates of all the replicas in the topology
* as seen by the ReplicationServer the last time it was polled or the last
@@ -131,15 +134,6 @@
*/
private HashMap<Integer, ServerState> replicaStates =
new HashMap<Integer, ServerState>();
-
- /**
- * A Map containing the ServerStates of all the replication servers in the
- * topology as seen by the ReplicationServer the last time it was polled or
- * the last time it published monitoring information.
- */
- private HashMap<Integer, ServerState> rsStates =
- new HashMap<Integer, ServerState>();
-
/**
* The expected duration in milliseconds between heartbeats received
* from the replication server. Zero means heartbeats are off.
@@ -163,10 +157,6 @@
*/
private boolean connectionError = false;
private final Object connectPhaseLock = new Object();
-
- // Same group id poller thread
- private SameGroupIdPoller sameGroupIdPoller = null;
-
/**
* The thread that publishes messages to the RS containing the current
* change time of this DS.
@@ -174,7 +164,7 @@
private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
/**
* The expected period in milliseconds between these messages are sent
- * to the replication server. Zero means heartbeats are off.
+ * to the replication server. Zero means heartbeats are off.
*/
private long changeTimeHeartbeatSendInterval = 0;
/*
@@ -183,12 +173,30 @@
// Info for other DSs.
// Warning: does not contain info for us (for our server id)
private List<DSInfo> dsList = new ArrayList<DSInfo>();
- // Info for other RSs.
- private List<RSInfo> rsList = new ArrayList<RSInfo>();
-
private long generationID;
private int updateDoneCount = 0;
private boolean connectRequiresRecovery = false;
+ /**
+ * The map of replication server info initialized at connection time and
+ * regularly updated. This is used to decide to which best suitable
+ * replication server one wants to connect.
+ * Key: replication server id
+ * Value: replication server info for the matching replication server id
+ */
+ private Map<Integer, ReplicationServerInfo> replicationServerInfos = null;
+ /**
+ * This integer defines when the best replication server checking algorithm
+ * should be engaged.
+ * Every time a monitoring message (each monitoring publisher period) is
+ * received, it is incremented. When it reaches 2, we run the checking
+ * algorithm to see if we must reconnect to another best replication server.
+ * Then we reset the value to 0. But when a topology message is received, the
+ * integer is reseted to 0. This insures that we wait at least one monitoring
+ * publisher period before running the algorithm, but also that we wait at
+ * least for a monitoring period after the last received topology message
+ * (topology stabilization).
+ */
+ private int mustRunBestServerCheckingAlgorithm = 0;
/**
* Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -228,7 +236,7 @@
this.heartbeatInterval = heartbeatInterval;
this.maxRcvWindow = window;
this.maxRcvWindow = window;
- this.halfRcvWindow = window /2;
+ this.halfRcvWindow = window / 2;
this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
}
@@ -284,7 +292,7 @@
return rsServerId;
}
- /**
+ /**
* Gets the server id.
* @return The server id
*/
@@ -300,9 +308,11 @@
private long getGenerationID()
{
if (domain != null)
- return domain.getGenerationID();
- else
- return generationID;
+ {
+ // Update the generation id
+ generationID = domain.getGenerationID();
+ }
+ return generationID;
}
/**
@@ -315,48 +325,167 @@
}
/**
- * Bag class for keeping info we get from a server in order to compute the
- * best one to connect to. This is in fact a wrapper to a
- * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
+ * Sets the locally configured flag for the passed ReplicationServerInfo
+ * object, analyzing the local configuration.
+ * @param
*/
- public static class ServerInfo
+ private void updateRSInfoLocallyConfiguredStatus(
+ ReplicationServerInfo replicationServerInfo)
+ {
+ // Determine if the passed ReplicationServerInfo has a URL that is present
+ // in the locally configured replication servers
+ String rsUrl = replicationServerInfo.getServerURL();
+ if (rsUrl == null)
+ {
+ // The ReplicationServerInfo has been generated from a server with
+ // no URL in TopologyMsg (i.e: with replication protocol version < 4):
+ // ignore this server as we do not know how to connect to it
+ replicationServerInfo.setLocallyConfigured(false);
+ return;
+ }
+ for (String serverUrl : servers)
+ {
+ if (isSameReplicationServerUrl(serverUrl, rsUrl))
+ {
+ // This RS is locally configured, mark this
+ replicationServerInfo.setLocallyConfigured(true);
+ return;
+ }
+ }
+ replicationServerInfo.setLocallyConfigured(false);
+ }
+
+ /**
+ * Compares 2 replication servers addresses and returns true if they both
+ * represent the same replication server instance.
+ * @param rs1Url Replication server 1 address
+ * @param rs2Url Replication server 2 address
+ * @return True if both replication server addresses represent the same
+ * replication server instance, false otherwise.
+ */
+ private static boolean isSameReplicationServerUrl(String rs1Url,
+ String rs2Url)
+ {
+ // Get and compare ports of RS1 and RS2
+ int separator1 = rs1Url.lastIndexOf(':');
+ if (separator1 < 0)
+ {
+ // Not a RS url: should not happen
+ return false;
+ }
+ int rs1Port = Integer.parseInt(rs1Url.substring(separator1 + 1));
+
+ int separator2 = rs2Url.lastIndexOf(':');
+ if (separator2 < 0)
+ {
+ // Not a RS url: should not happen
+ return false;
+ }
+ int rs2Port = Integer.parseInt(rs2Url.substring(separator2 + 1));
+
+ if (rs1Port != rs2Port)
+ {
+ return false;
+ }
+
+ // Get and compare addresses of RS1 and RS2
+ String rs1 = rs1Url.substring(0, separator1);
+ InetAddress[] rs1Addresses = null;
+ try
+ {
+ if (rs1.equals("localhost") || rs1.equals("127.0.0.1"))
+ {
+ // Replace localhost with the local official hostname
+ rs1 = InetAddress.getLocalHost().getHostName();
+ }
+ rs1Addresses = InetAddress.getAllByName(rs1);
+ } catch (UnknownHostException ex)
+ {
+ // Unknown RS: should not happen
+ return false;
+ }
+
+ String rs2 = rs2Url.substring(0, separator2);
+ InetAddress[] rs2Addresses = null;
+ try
+ {
+ if (rs2.equals("localhost") || rs2.equals("127.0.0.1"))
+ {
+ // Replace localhost with the local official hostname
+ rs2 = InetAddress.getLocalHost().getHostName();
+ }
+ rs2Addresses = InetAddress.getAllByName(rs2);
+ } catch (UnknownHostException ex)
+ {
+ // Unknown RS: should not happen
+ return false;
+ }
+
+ // Now compare addresses, if at least one match, this is the same server
+ for (int i = 0; i < rs1Addresses.length; i++)
+ {
+ InetAddress inetAddress1 = rs1Addresses[i];
+ for (int j = 0; j < rs2Addresses.length; j++)
+ {
+ InetAddress inetAddress2 = rs2Addresses[j];
+ if (inetAddress2.equals(inetAddress1))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Bag class for keeping info we get from a replication server in order to
+ * compute the best one to connect to. This is in fact a wrapper to a
+ * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be
+ * updated with a info coming from received topology messages or monitoring
+ * messages.
+ */
+ public static class ReplicationServerInfo
{
private short protocolVersion;
private long generationId;
private byte groupId = (byte) -1;
private int serverId;
+ // Received server URL
private String serverURL;
private String baseDn = null;
private int windowSize;
- private ServerState serverState;
+ private ServerState serverState = null;
private boolean sslEncryption;
private int degradedStatusThreshold = -1;
- // Keeps the -1 value if created with a ReplServerStartMsg
- private int weight = -1;
- // Keeps the -1 value if created with a ReplServerStartMsg
- private int connectedDSNumber = -1;
+ // Keeps the 1 value if created with a ReplServerStartMsg
+ private int weight = 1;
+ // Keeps the 0 value if created with a ReplServerStartMsg
+ private int connectedDSNumber = 0;
+ private List<Integer> connectedDSs = null;
+ // Is this RS locally configured ? (the RS is recognized as a usable server)
+ private boolean locallyConfigured = true;
/**
- * Create a new instance of ServerInfo wrapping the passed message.
+ * Create a new instance of ReplicationServerInfo wrapping the passed
+ * message.
* @param msg Message to wrap.
* @return The new instance wrapping the passed message.
* @throws IllegalArgumentException If the passed message has an unexpected
* type.
*/
- public static ServerInfo newServerInfo(
+ public static ReplicationServerInfo newInstance(
ReplicationMsg msg) throws IllegalArgumentException
{
if (msg instanceof ReplServerStartMsg)
{
// This is a ReplServerStartMsg (RS uses protocol V3 or under)
- ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
- return new ServerInfo(replServerStartMsg);
- }
- else if (msg instanceof ReplServerStartDSMsg)
+ ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg;
+ return new ReplicationServerInfo(replServerStartMsg);
+ } else if (msg instanceof ReplServerStartDSMsg)
{
// This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
- ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
- return new ServerInfo(replServerStartDSMsg);
+ ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg;
+ return new ReplicationServerInfo(replServerStartDSMsg);
}
// Unsupported message type: should not happen
@@ -365,10 +494,10 @@
}
/**
- * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
+ * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg.
* @param replServerStartMsg The ReplServerStartMsg this object will wrap.
*/
- private ServerInfo(ReplServerStartMsg replServerStartMsg)
+ private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
{
this.protocolVersion = replServerStartMsg.getVersion();
this.generationId = replServerStartMsg.getGenerationId();
@@ -384,11 +513,12 @@
}
/**
- * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
+ * Constructs a ReplicationServerInfo object wrapping a
+ * ReplServerStartDSMsg.
* @param replServerStartDSMsg The ReplServerStartDSMsg this object will
* wrap.
*/
- private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
+ private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
{
this.protocolVersion = replServerStartDSMsg.getVersion();
this.generationId = replServerStartDSMsg.getGenerationId();
@@ -514,16 +644,98 @@
{
return connectedDSNumber;
}
+
+ /**
+ * Constructs a new replication server info with the passed RSInfo
+ * internal values and the passed connected DSs.
+ * @param rsInfo The RSinfo to use for the update
+ * @param connectedDSs The new connected DSs
+ */
+ public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs)
+ {
+ this.serverId = rsInfo.getId();
+ this.serverURL = rsInfo.getServerUrl();
+ this.generationId = rsInfo.getGenerationId();
+ this.groupId = rsInfo.getGroupId();
+ this.weight = rsInfo.getWeight();
+ this.connectedDSs = connectedDSs;
+ this.connectedDSNumber = connectedDSs.size();
+ }
+
+ /**
+ * Converts the object to a RSInfo object.
+ * @return The RSInfo object matching this object.
+ */
+ public RSInfo toRSInfo()
+ {
+ return new RSInfo(serverId, serverURL, generationId, groupId, weight);
+ }
+
+ /**
+ * Updates replication server info with the passed RSInfo internal values
+ * and the passed connected DSs.
+ * @param rsInfo The RSinfo to use for the update
+ * @param connectedDSs The new connected DSs
+ */
+ public void update(RSInfo rsInfo, List<Integer> connectedDSs)
+ {
+ this.generationId = rsInfo.getGenerationId();
+ this.groupId = rsInfo.getGroupId();
+ this.weight = rsInfo.getWeight();
+ this.connectedDSs = connectedDSs;
+ this.connectedDSNumber = connectedDSs.size();
+ }
+
+ /**
+ * Updates replication server info with the passed server state.
+ * @param serverState The ServerState to use for the update
+ */
+ public void update(ServerState serverState)
+ {
+ if (this.serverState != null)
+ {
+ this.serverState.update(serverState);
+ } else
+ {
+ this.serverState = serverState;
+ }
+ }
+
+ /**
+ * Get the getConnectedDSs.
+ * @return the getConnectedDSs
+ */
+ public List<Integer> getConnectedDSs()
+ {
+ return connectedDSs;
+ }
+
+ /**
+ * Gets the locally configured status for this RS.
+ * @return the locallyConfigured
+ */
+ public boolean isLocallyConfigured()
+ {
+ return locallyConfigured;
+ }
+
+ /**
+ * Sets the locally configured status for this RS.
+ * @param locallyConfigured the locallyConfigured to set
+ */
+ public void setLocallyConfigured(boolean locallyConfigured)
+ {
+ this.locallyConfigured = locallyConfigured;
+ }
}
private void connect()
{
if (this.baseDn.compareToIgnoreCase(
- ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0)
+ ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0)
{
connectAsECL();
- }
- else
+ } else
{
connectAsDataServer();
}
@@ -534,19 +746,22 @@
* able to choose the more suitable.
* @return the collected information.
*/
- private Map<String, ServerInfo> collectReplicationServersInfo() {
+ private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
+ {
- Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+ Map<Integer, ReplicationServerInfo> rsInfos =
+ new HashMap<Integer, ReplicationServerInfo>();
for (String server : servers)
{
// Connect to server and get info about it
- ServerInfo serverInfo = performPhaseOneHandshake(server, false);
+ ReplicationServerInfo replicationServerInfo =
+ performPhaseOneHandshake(server, false);
// Store server info in list
- if (serverInfo != null)
+ if (replicationServerInfo != null)
{
- rsInfos.put(server, serverInfo);
+ rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo);
}
}
@@ -558,7 +773,6 @@
* are :
* - 1 single RS configured
* - so no choice of the preferred RS
- * - No same groupID polling
* - ?? Heartbeat
* - Start handshake is :
* Broker ---> StartECLMsg ---> RS
@@ -570,10 +784,10 @@
// FIXME:ECL List of RS to connect is for now limited to one RS only
String bestServer = this.servers.iterator().next();
- ReplServerStartDSMsg inReplServerStartDSMsg
- = performECLPhaseOneHandshake(bestServer, true);
+ ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake(
+ bestServer, true);
- if (inReplServerStartDSMsg!=null)
+ if (inReplServerStartDSMsg != null)
performECLPhaseTwoHandshake(bestServer);
}
@@ -589,7 +803,7 @@
*
* phase 1:
* DS --- ServerStartMsg ---> RS
- * DS <--- ReplServerStartMsg --- RS
+ * DS <--- ReplServerStartDSMsg --- RS
* phase 2:
* DS --- StartSessionMsg ---> RS
* DS <--- TopologyMsg --- RS
@@ -615,9 +829,9 @@
}
// Stop any existing poller and heartbeat monitor from a previous session.
- stopSameGroupIdPoller();
stopRSHeartBeatMonitoring();
stopChangeTimeHeartBeatPublishing();
+ mustRunBestServerCheckingAlgorithm = 0;
boolean newServerWithSameGroupId = false;
synchronized (connectPhaseLock)
@@ -628,41 +842,47 @@
*/
if (debugEnabled())
TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
- " order to elect the preferred one");
+ " order to elect the preferred one");
// Get info from every available replication servers
- Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
+ replicationServerInfos = collectReplicationServersInfo();
- ServerInfo serverInfo = null;
+ ReplicationServerInfo replicationServerInfo = null;
- if (rsInfos.size() > 0)
+ if (replicationServerInfos.size() > 0)
{
// At least one server answered, find the best one.
- String bestServer = computeBestReplicationServer(state, rsInfos,
- serverId, baseDn, groupId);
+ replicationServerInfo = computeBestReplicationServer(true, -1, state,
+ replicationServerInfos, serverId, baseDn, groupId,
+ this.getGenerationID());
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
TRACER.debugInfo(
- "phase 2 : will perform PhaseOneH with the preferred RS.");
- serverInfo = performPhaseOneHandshake(bestServer, true);
+ "phase 2 : will perform PhaseOneH with the preferred RS.");
+ replicationServerInfo = performPhaseOneHandshake(
+ replicationServerInfo.getServerURL(), true);
+ // Update replication server info with potentially more up to date data
+ // (server state for instance may have changed)
+ replicationServerInfos.put(replicationServerInfo.getServerId(),
+ replicationServerInfo);
- if (serverInfo != null) // Handshake phase 1 exchange went well
-
+ if (replicationServerInfo != null)
{
+ // Handshake phase 1 exchange went well
+
// Compute in which status we are starting the session to tell the RS
ServerStatus initStatus =
- computeInitialServerStatus(serverInfo.getGenerationId(),
- serverInfo.getServerState(),
- serverInfo.getDegradedStatusThreshold(),
+ computeInitialServerStatus(replicationServerInfo.getGenerationId(),
+ replicationServerInfo.getServerState(),
+ replicationServerInfo.getDegradedStatusThreshold(),
this.getGenerationID());
- // Perfom session start (handshake phase 2)
- TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer,
- initStatus);
+ // Perform session start (handshake phase 2)
+ TopologyMsg topologyMsg = performPhaseTwoHandshake(
+ replicationServerInfo.getServerURL(), initStatus);
if (topologyMsg != null) // Handshake phase 2 exchange went well
-
{
try
{
@@ -681,7 +901,7 @@
* reconnection at that time to retrieve a server with our group
* id.
*/
- byte tmpRsGroupId = serverInfo.getGroupId();
+ byte tmpRsGroupId = replicationServerInfo.getGroupId();
boolean someServersWithSameGroupId =
hasSomeServerWithSameGroupId(topologyMsg.getRsList());
@@ -690,10 +910,10 @@
((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
{
replicationServer = session.getReadableRemoteAddress();
- maxSendWindow = serverInfo.getWindowSize();
- rsGroupId = serverInfo.getGroupId();
- rsServerId = serverInfo.getServerId();
- rsServerUrl = bestServer;
+ maxSendWindow = replicationServerInfo.getWindowSize();
+ rsGroupId = replicationServerInfo.getGroupId();
+ rsServerId = replicationServerInfo.getServerId();
+ rsServerUrl = replicationServerInfo.getServerURL();
receiveTopo(topologyMsg);
@@ -715,27 +935,27 @@
if (domain != null)
{
domain.sessionInitiated(
- initStatus, serverInfo.getServerState(),
- serverInfo.getGenerationId(),
- session);
+ initStatus, replicationServerInfo.getServerState(),
+ replicationServerInfo.getGenerationId(),
+ session);
}
if (getRsGroupId() != groupId)
{
- // Connected to replication server with wrong group id:
- // warn user and start poller to recover when a server with
- // right group id arrives...
- Message message =
- WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
- Byte.toString(groupId), Integer.toString(rsServerId),
- bestServer, Byte.toString(getRsGroupId()),
- baseDn.toString(), Integer.toString(serverId));
- logError(message);
- startSameGroupIdPoller();
+ // Connected to replication server with wrong group id:
+ // warn user and start poller to recover when a server with
+ // right group id arrives...
+ Message message =
+ WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
+ Byte.toString(groupId), Integer.toString(rsServerId),
+ replicationServerInfo.getServerURL(),
+ Byte.toString(getRsGroupId()),
+ baseDn.toString(), Integer.toString(serverId));
+ logError(message);
}
startRSHeartBeatMonitoring();
- if (serverInfo.getProtocolVersion()
- >= ProtocolVersion.REPLICATION_PROTOCOL_V3)
+ if (replicationServerInfo.getProtocolVersion() >=
+ ProtocolVersion.REPLICATION_PROTOCOL_V3)
{
startChangeTimeHeartBeatPublishing();
}
@@ -753,7 +973,7 @@
} catch (Exception e)
{
Message message = ERR_COMPUTING_FAKE_OPS.get(
- baseDn, bestServer,
+ baseDn, replicationServerInfo.getServerURL(),
e.getLocalizedMessage() + stackTraceToSingleLineString(e));
logError(message);
} finally
@@ -783,8 +1003,9 @@
{
connectPhaseLock.notify();
- if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
- (serverInfo.getGenerationId() == -1))
+ if ((replicationServerInfo.getGenerationId() ==
+ this.getGenerationID()) ||
+ (replicationServerInfo.getGenerationId() == -1))
{
Message message =
NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -801,7 +1022,7 @@
baseDn.toString(),
replicationServer,
Long.toString(this.getGenerationID()),
- Long.toString(serverInfo.getGenerationId()));
+ Long.toString(replicationServerInfo.getGenerationId()));
logError(message);
}
} else
@@ -908,7 +1129,7 @@
/**
* Connect to the provided server performing the first phase handshake
* (start messages exchange) and return the reply message from the replication
- * server, wrapped in a ServerInfo object.
+ * server, wrapped in a ReplicationServerInfo object.
*
* @param server Server to connect to.
* @param keepConnection Do we keep session opened or not after handshake.
@@ -917,10 +1138,10 @@
* @return The answer from the server . Null if could not
* get an answer.
*/
- private ServerInfo performPhaseOneHandshake(String server,
+ private ReplicationServerInfo performPhaseOneHandshake(String server,
boolean keepConnection)
{
- ServerInfo serverInfo = null;
+ ReplicationServerInfo replServerInfo = null;
// Parse server string.
int separator = server.lastIndexOf(':');
@@ -962,17 +1183,17 @@
ReplicationMsg msg = localSession.receive();
if (debugEnabled())
- {
- TRACER.debugInfo("In RB for " + baseDn +
- "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
- "\nAND RECEIVED:\n" + msg.toString());
- }
+ {
+ TRACER.debugInfo("In RB for " + baseDn +
+ "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
+ "\nAND RECEIVED:\n" + msg.toString());
+ }
// Wrap received message in a server info object
- serverInfo = ServerInfo.newServerInfo(msg);
+ replServerInfo = ReplicationServerInfo.newInstance(msg);
// Sanity check
- String repDn = serverInfo.getBaseDn();
+ String repDn = replServerInfo.getBaseDn();
if (!(this.baseDn.equals(repDn)))
{
Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -987,7 +1208,7 @@
* if it is an old replication server).
*/
protocolVersion = ProtocolVersion.minWithCurrent(
- serverInfo.getProtocolVersion());
+ replServerInfo.getProtocolVersion());
localSession.setProtocolVersion(protocolVersion);
@@ -1017,7 +1238,7 @@
error = true;
} catch (Exception e)
{
- if ( (e instanceof SocketTimeoutException) && debugEnabled() )
+ if ((e instanceof SocketTimeoutException) && debugEnabled())
{
TRACER.debugInfo("Timeout trying to connect to RS " + server +
" for dn: " + baseDn);
@@ -1068,7 +1289,7 @@
}
if (error)
{
- serverInfo = null;
+ replServerInfo = null;
} // Be sure to return null.
}
@@ -1080,7 +1301,7 @@
session = localSession;
}
- return serverInfo;
+ return replServerInfo;
}
/**
@@ -1126,11 +1347,11 @@
// Send our start msg.
ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg(
- baseDn, 0, 0, 0, 0,
- maxRcvWindow, heartbeatInterval, state,
- ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
- isSslEncryption,
- groupId);
+ baseDn, 0, 0, 0, 0,
+ maxRcvWindow, heartbeatInterval, state,
+ ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
+ isSslEncryption,
+ groupId);
localSession.publish(serverStartECLMsg);
// Read the ReplServerStartMsg that should come back.
@@ -1189,7 +1410,7 @@
error = true;
} catch (Exception e)
{
- if ( (e instanceof SocketTimeoutException) && debugEnabled() )
+ if ((e instanceof SocketTimeoutException) && debugEnabled())
{
TRACER.debugInfo("Timeout trying to connect to RS " + server +
" for dn: " + baseDn);
@@ -1282,7 +1503,7 @@
{
TRACER.debugInfo("In RB for " + baseDn +
"\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString());
- // + "\nAND RECEIVED:\n" + topologyMsg.toString());
+ // + "\nAND RECEIVED:\n" + topologyMsg.toString());
}
// Alright set the timeout to the desired value
@@ -1340,13 +1561,13 @@
{
startSessionMsg =
new StartSessionMsg(
- initStatus,
- domain.getRefUrls(),
- domain.isAssured(),
- domain.getAssuredMode(),
- domain.getAssuredSdLevel());
+ initStatus,
+ domain.getRefUrls(),
+ domain.isAssured(),
+ domain.getAssuredMode(),
+ domain.getAssuredSdLevel());
startSessionMsg.setEclIncludes(
- domain.getEclInclude());
+ domain.getEclInclude());
} else
{
startSessionMsg =
@@ -1395,269 +1616,560 @@
/**
* Returns the replication server that best fits our need so that we can
- * connect to it.
- * This methods performs some filtering on the group id, then call
- * the real search for best server algorithm (searchForBestReplicationServer).
+ * connect to it or determine if we must disconnect from current one to
+ * re-connect to best server.
*
- * Note: this method put as public static for unit testing purpose.
+ * Note: this method is static for test purpose (access from unit tests)
*
+ * @param firstConnection True if we run this method for the very first
+ * connection of the broker. False if we run this method to determine if the
+ * replication server we are currently connected to is still the best or not.
+ * @param rsServerId The id of the replication server we are currently
+ * connected to. Only used when firstConnection is false.
* @param myState The local server state.
* @param rsInfos The list of available replication servers and their
- * associated information (choice will be made among them).
+ * associated information (choice will be made among them).
* @param localServerId The server id for the suffix we are working for.
* @param baseDn The suffix for which we are working for.
* @param groupId The groupId we prefer being connected to if possible
- * @return The computed best replication server.
+ * @param generationId The generation id we are using
+ * @return The computed best replication server. If the returned value is
+ * null, the best replication server is undetermined but the local server must
+ * disconnect (so the best replication server is another one than the current
+ * one). Null can only be returned when firstConnection is false.
*/
- public static String computeBestReplicationServer(ServerState myState,
- Map<String, ServerInfo> rsInfos, int localServerId,
- String baseDn, byte groupId)
+ public static ReplicationServerInfo computeBestReplicationServer(
+ boolean firstConnection, int rsServerId, ServerState myState,
+ Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
+ String baseDn, byte groupId, long generationId)
{
- /*
- * Preference is given to servers with the requested group id:
- * If there are some servers with the requested group id in the provided
- * server list, then we run the search algorithm only on them. If no server
- * with the requested group id, consider all of them.
- */
-
- // Filter for servers with same group id
- Map<String, ServerInfo> sameGroupIdRsInfos =
- new HashMap<String, ServerInfo>();
-
- for (String repServer : rsInfos.keySet())
- {
- ServerInfo serverInfo = rsInfos.get(repServer);
- if (serverInfo.getGroupId() == groupId)
- sameGroupIdRsInfos.put(repServer, serverInfo);
- }
-
- // Some servers with same group id ?
- if (sameGroupIdRsInfos.size() > 0)
- {
- return searchForBestReplicationServer(myState, sameGroupIdRsInfos,
- localServerId, baseDn);
- } else
- {
- return searchForBestReplicationServer(myState, rsInfos,
- localServerId, baseDn);
- }
- }
-
- /**
- * Returns the replication server that best fits our need so that we can
- * connect to it.
- *
- * Note: this method put as public static for unit testing purpose.
- *
- * @param myState The local server state.
- * @param rsInfos The list of available replication servers and their
- * associated information (choice will be made among them).
- * @param localServerID The server id for the suffix we are working for.
- * @param baseDn The suffix for which we are working for.
- * @return The computed best replication server.
- */
- private static String searchForBestReplicationServer(ServerState myState,
- Map<String, ServerInfo> rsInfos, int localServerID, String baseDn)
- {
- /*
- * Find replication servers who are up to date (or more up to date than us,
- * if for instance we failed and restarted, having sent some changes to the
- * RS but without having time to store our own state) regarding our own
- * server id. Then, among them, choose the server that is the most up to
- * date regarding the whole topology.
- *
- * If no server is up to date regarding our own server id, find the one who
- * is the most up to date regarding our server id.
- */
-
- // Should never happen (sanity check)
- if ((myState == null) || (rsInfos == null) || (rsInfos.size() < 1) ||
- (baseDn == null))
- {
- return null;
- }
// Shortcut, if only one server, this is the best
if (rsInfos.size() == 1)
{
- for (String repServer : rsInfos.keySet())
- return repServer;
+ return rsInfos.values().iterator().next();
}
- String bestServer = null;
- boolean bestServerIsLocal = false;
-
- // Servers up to dates with regard to our changes
- HashMap<String, ServerState> upToDateServers =
- new HashMap<String, ServerState>();
- // Servers late with regard to our changes
- HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
-
- /*
- * Start loop to differentiate up to date servers from late ones.
+ /**
+ * Apply some filtering criteria to determine the best servers list from
+ * the available ones. The ordered list of criteria is (from more important
+ * to less important):
+ * - replication server has the same group id as the local DS one
+ * - replication server has the same generation id as the local DS one
+ * - replication server is up to date regarding changes generated by the
+ * local DS
+ * - replication server in the same VM as local DS one
*/
- ChangeNumber myChangeNumber = myState.getMaxChangeNumber(localServerID);
- if (myChangeNumber == null)
+ Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
+ Map<Integer, ReplicationServerInfo> newBestServers;
+ // The list of best replication servers is filtered with each criteria. At
+ // each criteria, the list is replaced with the filtered one if some there
+ // are some servers from the filtering, otherwise, the list is left as is
+ // and the new filtering for the next criteria is applied and so on.
+ for (int filterLevel = 1; filterLevel <= 4; filterLevel++)
{
- myChangeNumber = new ChangeNumber(0, 0, localServerID);
- }
- for (String repServer : rsInfos.keySet())
- {
-
- ServerState rsState = rsInfos.get(repServer).getServerState();
- ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerID);
- if (rsChangeNumber == null)
+ newBestServers = null;
+ switch (filterLevel)
{
- rsChangeNumber = new ChangeNumber(0, 0, localServerID);
+ case 1:
+ // Use only servers locally configured: those are servers declared in
+ // the local configuration. When the current method is called, for
+ // sure, at least one server from the list is locally configured
+ bestServers = filterServersLocallyConfigured(bestServers);
+ break;
+ case 2:
+ // Some servers with same group id ?
+ newBestServers = filterServersWithSameGroupId(bestServers, groupId);
+ if (newBestServers.size() > 0)
+ {
+ bestServers = newBestServers;
+ }
+ break;
+ case 3:
+ // Some servers with same generation id ?
+ newBestServers = filterServersWithSameGenerationId(bestServers,
+ generationId);
+ if (newBestServers.size() > 0)
+ {
+ // Ok some servers with the right generation id
+ bestServers = newBestServers;
+ // If some servers with the right generation id this is useful to
+ // run the local DS change criteria
+ newBestServers = filterServersWithAllLocalDSChanges(bestServers,
+ myState, localServerId);
+ if (newBestServers.size() > 0)
+ {
+ bestServers = newBestServers;
+ }
+ }
+ break;
+ case 4:
+ // Some servers in the local VM ?
+ newBestServers = filterServersInSameVM(bestServers);
+ if (newBestServers.size() > 0)
+ {
+ bestServers = newBestServers;
+ }
+ break;
}
+ }
- // Store state in right list
- if (myChangeNumber.olderOrEqual(rsChangeNumber))
+ /**
+ * Now apply the choice base on the weight to the best servers list
+ */
+ if (bestServers.size() > 1)
+ {
+ if (firstConnection)
{
- upToDateServers.put(repServer, rsState);
+ // We are no connected to a server yet
+ return computeBestServerForWeight(bestServers, -1, -1);
} else
{
- lateOnes.put(repServer, rsState);
+ // We are already connected to a RS: compute the best RS as far as the
+ // weights is concerned. If this is another one, some DS must
+ // disconnect.
+ return computeBestServerForWeight(bestServers, rsServerId,
+ localServerId);
}
- }
-
- if (upToDateServers.size() > 0)
- {
- /*
- * Some up to date servers, among them, choose either :
- * - The local one
- * - The one that has the maximum number of changes to send us.
- * This is the most up to date one regarding the whole topology.
- * This server is the one which has the less
- * difference with the topology server state.
- * For comparison, we need to compute the difference for each
- * server id with the topology server state.
- */
-
- Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
- upToDateServers.size(), baseDn, Integer.toString(localServerID));
- logError(message);
-
- /*
- * First of all, compute the virtual server state for the whole topology,
- * which is composed of the most up to date change numbers for
- * each server id in the topology.
- */
- ServerState topoState = new ServerState();
- for (ServerState curState : upToDateServers.values())
- {
-
- Iterator<Integer> it = curState.iterator();
- while (it.hasNext())
- {
- Integer sId = it.next();
- ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
- if (curSidCn == null)
- {
- curSidCn = new ChangeNumber(0, 0, sId);
- }
- // Update topology state
- topoState.update(curSidCn);
- }
- } // For up to date servers
-
- // Min of the max shifts
- long minShift = -1L;
- for (String upServer : upToDateServers.keySet())
- {
-
- /*
- * Compute the maximum difference between the time of a server id's
- * change number and the time of the matching server id's change
- * number in the topology server state.
- *
- * Note: we could have used the sequence number here instead of the
- * timestamp, but this would have caused a problem when the sequence
- * number loops and comes back to 0 (computation would have becomen
- * meaningless).
- */
- long shift = 0;
- ServerState curState = upToDateServers.get(upServer);
- Iterator<Integer> it = curState.iterator();
- while (it.hasNext())
- {
- Integer sId = it.next();
- ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
- if (curSidCn == null)
- {
- curSidCn = new ChangeNumber(0, 0, sId);
- }
- // Cannot be null as checked at construction time
- ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
- // Cannot be negative as topoState computed as being the max CN
- // for each server id in the topology
- long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
- shift +=tmpShift;
- }
-
- boolean upServerIsLocal =
- ReplicationServer.isLocalReplicationServer(upServer);
- if ((minShift < 0) // First time in loop
- || ((shift < minShift) && upServerIsLocal)
- || (((bestServerIsLocal == false) && (shift < minShift)))
- || ((bestServerIsLocal == false) && (upServerIsLocal &&
- (shift<(minShift + 60)) ))
- || (shift+120 < minShift))
- {
- // This server is even closer to topo state
- bestServer = upServer;
- bestServerIsLocal = upServerIsLocal;
- minShift = shift;
- }
- } // For up to date servers
-
} else
{
- /*
- * We could not find a replication server that has seen all the
- * changes that this server has already processed,
- */
- // lateOnes cannot be empty
- Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
- baseDn, lateOnes.size());
- logError(message);
-
- // Min of the shifts
- long minShift = -1L;
- for (String lateServer : lateOnes.keySet())
- {
- /*
- * Choose the server who is the closest to us regarding our server id
- * (this is the most up to date regarding our server id).
- */
- ServerState curState = lateOnes.get(lateServer);
- ChangeNumber ourSidCn = curState.getMaxChangeNumber(localServerID);
- if (ourSidCn == null)
- {
- ourSidCn = new ChangeNumber(0, 0, localServerID);
- }
- // Cannot be negative as our Cn for our server id is strictly
- // greater than those of the servers in late server list
- long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
-
- boolean lateServerisLocal =
- ReplicationServer.isLocalReplicationServer(lateServer);
- if ((minShift < 0) // First time in loop
- || ((tmpShift < minShift) && lateServerisLocal)
- || (((bestServerIsLocal == false) && (tmpShift < minShift)))
- || ((bestServerIsLocal == false) && (lateServerisLocal &&
- (tmpShift<(minShift + 60)) ))
- || (tmpShift+120 < minShift))
- {
- // This server is even closer to topo state
- bestServer = lateServer;
- bestServerIsLocal = lateServerisLocal;
- minShift = tmpShift;
- }
- } // For late servers
-
+ return bestServers.values().iterator().next();
}
- return bestServer;
+ }
+
+ /**
+ * Creates a new list that contains only replication servers that are locally
+ * configured.
+ * @param bestServers The list of replication servers to filter
+ * @return The sub list of replication servers locally configured
+ */
+ private static Map<Integer, ReplicationServerInfo>
+ filterServersLocallyConfigured(Map<Integer,
+ ReplicationServerInfo> bestServers)
+ {
+ Map<Integer, ReplicationServerInfo> result =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+ if (replicationServerInfo.isLocallyConfigured())
+ {
+ result.put(rsId, replicationServerInfo);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Creates a new list that contains only replication servers that have the
+ * passed group id, from a passed replication server list.
+ * @param bestServers The list of replication servers to filter
+ * @param groupId The group id that must match
+ * @return The sub list of replication servers matching the requested group id
+ * (which may be empty)
+ */
+ private static Map<Integer, ReplicationServerInfo>
+ filterServersWithSameGroupId(Map<Integer,
+ ReplicationServerInfo> bestServers, byte groupId)
+ {
+ Map<Integer, ReplicationServerInfo> result =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+ if (replicationServerInfo.getGroupId() == groupId)
+ {
+ result.put(rsId, replicationServerInfo);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Creates a new list that contains only replication servers that have the
+ * passed generation id, from a passed replication server list.
+ * @param bestServers The list of replication servers to filter
+ * @param generationId The generation id that must match
+ * @return The sub list of replication servers matching the requested
+ * generation id (which may be empty)
+ */
+ private static Map<Integer, ReplicationServerInfo>
+ filterServersWithSameGenerationId(Map<Integer,
+ ReplicationServerInfo> bestServers, long generationId)
+ {
+ Map<Integer, ReplicationServerInfo> result =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+ if (replicationServerInfo.getGenerationId() == generationId)
+ {
+ result.put(rsId, replicationServerInfo);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Creates a new list that contains only replication servers that have the
+ * latest changes from the passed DS, from a passed replication server list.
+ * @param bestServers The list of replication servers to filter
+ * @param localState The state of the local DS
+ * @param localServerId The server id to consider for the changes
+ * @return The sub list of replication servers that have the latest changes
+ * from the passed DS (which may be empty)
+ */
+ private static Map<Integer, ReplicationServerInfo>
+ filterServersWithAllLocalDSChanges(Map<Integer,
+ ReplicationServerInfo> bestServers, ServerState localState,
+ int localServerId)
+ {
+ Map<Integer, ReplicationServerInfo> upToDateServers =
+ new HashMap<Integer, ReplicationServerInfo>();
+ Map<Integer, ReplicationServerInfo> moreUpToDateServers =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ // Extract the change number of the latest change generated by the local
+ // server
+ ChangeNumber myChangeNumber = localState.getMaxChangeNumber(localServerId);
+ if (myChangeNumber == null)
+ {
+ myChangeNumber = new ChangeNumber(0, 0, localServerId);
+ }
+
+ /**
+ * Find replication servers who are up to date (or more up to date than us,
+ * if for instance we failed and restarted, having sent some changes to the
+ * RS but without having time to store our own state) regarding our own
+ * server id. If some servers more up to date, prefer this list.
+ */
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+ ServerState rsState = replicationServerInfo.getServerState();
+ ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerId);
+ if (rsChangeNumber == null)
+ {
+ rsChangeNumber = new ChangeNumber(0, 0, localServerId);
+ }
+
+ // Has this replication server the latest local change ?
+ if (myChangeNumber.olderOrEqual(rsChangeNumber))
+ {
+ if (myChangeNumber.equals(rsChangeNumber))
+ {
+ // This replication server has exactly the latest change from the
+ // local server
+ upToDateServers.put(rsId, replicationServerInfo);
+ } else
+ {
+ // This replication server is even more up to date than the local
+ // server
+ moreUpToDateServers.put(rsId, replicationServerInfo);
+ }
+ }
+ }
+ if (moreUpToDateServers.size() > 0)
+ {
+ // Prefer servers more up to date than local server
+ return moreUpToDateServers;
+ } else
+ {
+ return upToDateServers;
+ }
+ }
+
+ /**
+ * Creates a new list that contains only replication servers that are in the
+ * same VM as the local DS, from a passed replication server list.
+ * @param bestServers The list of replication servers to filter
+ * @return The sub list of replication servers being in the same VM as the
+ * local DS (which may be empty)
+ */
+ private static Map<Integer, ReplicationServerInfo> filterServersInSameVM(
+ Map<Integer, ReplicationServerInfo> bestServers)
+ {
+ Map<Integer, ReplicationServerInfo> result =
+ new HashMap<Integer, ReplicationServerInfo>();
+
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+ if (ReplicationServer.isLocalReplicationServer(
+ replicationServerInfo.getServerURL()))
+ {
+ result.put(rsId, replicationServerInfo);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Computes the best replication server the local server should be connected
+ * to so that the load is correctly spread across the topology, following the
+ * weights guidance.
+ * Warning: This method is expected to be called with at least 2 servers in
+ * bestServers
+ * Note: this method is static for test purpose (access from unit tests)
+ * @param bestServers The list of replication servers to consider
+ * @param currentRsServerId The replication server the local server is
+ * currently connected to. -1 if the local server is not yet connected
+ * to any replication server.
+ * @param localServerId The server id of the local server. This is not used
+ * when it is not connected to a replication server
+ * (currentRsServerId = -1)
+ * @return The replication server the local server should be connected to
+ * as far as the weight is concerned. This may be the currently used one if
+ * the weight is correctly spread. If the returned value is null, the best
+ * replication server is undetermined but the local server must disconnect
+ * (so the best replication server is another one than the current one).
+ */
+ public static ReplicationServerInfo computeBestServerForWeight(
+ Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId,
+ int localServerId)
+ {
+ /*
+ * - Compute the load goal of each RS, deducing it from the weights affected
+ * to them.
+ * - Compute the current load of each RS, deducing it from the DSs
+ * currently connected to them.
+ * - Compute the differences between the load goals and the current loads of
+ * the RSs.
+ */
+ // Sum of the weights
+ int sumOfWeights = 0;
+ // Sum of the connected DSs
+ int sumOfConnectedDSs = 0;
+ for (ReplicationServerInfo replicationServerInfo : bestServers.values())
+ {
+ sumOfWeights += replicationServerInfo.getWeight();
+ sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
+ }
+ // Distance (difference) of the current loads to the load goals of each RS:
+ // key:server id, value: distance
+ Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
+ // Precision for the operations (number of digits after the dot)
+ // Default value of rounding method is HALF_UP for
+ // the MathContext
+ MathContext mathContext = new MathContext(32);
+ for (Integer rsId : bestServers.keySet())
+ {
+ ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+
+ int rsWeight = replicationServerInfo.getWeight();
+ // load goal = rs weight / sum of weights
+ BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide(
+ new BigDecimal(sumOfWeights), mathContext);
+ BigDecimal currentLoadBd = BigDecimal.ZERO;
+ if (sumOfConnectedDSs != 0)
+ {
+ // current load = number of connected DSs / total number of DSs
+ int connectedDSs = replicationServerInfo.getConnectedDSNumber();
+ currentLoadBd = (new BigDecimal(connectedDSs)).divide(
+ new BigDecimal(sumOfConnectedDSs), mathContext);
+ }
+ // load distance = load goal - current load
+ BigDecimal loadDistanceBd =
+ loadGoalBd.subtract(currentLoadBd, mathContext);
+ loadDistances.put(rsId, loadDistanceBd);
+ }
+
+ if (currentRsServerId == -1)
+ {
+ // The local server is not connected yet
+
+ /*
+ * Find the server with the current highest distance to its load goal and
+ * choose it. Make an exception if every server is correctly balanced,
+ * that is every current load distances are equal to 0, in that case,
+ * choose the server with the highest weight
+ */
+ int bestRsId = 0; // If all server equal, return the first one
+ float highestDistance = Float.NEGATIVE_INFINITY;
+ boolean allRsWithZeroDistance = true;
+ int highestWeightRsId = -1;
+ int highestWeight = -1;
+ for (Integer rsId : bestServers.keySet())
+ {
+ float loadDistance = loadDistances.get(rsId).floatValue();
+ if (loadDistance > highestDistance)
+ {
+ // This server is far more from its balance point
+ bestRsId = rsId;
+ highestDistance = loadDistance;
+ }
+ if (loadDistance != (float)0)
+ {
+ allRsWithZeroDistance = false;
+ }
+ int weight = bestServers.get(rsId).getWeight();
+ if (weight > highestWeight)
+ {
+ // This server has a higher weight
+ highestWeightRsId = rsId;
+ highestWeight = weight;
+ }
+ }
+ // All servers with a 0 distance ?
+ if (allRsWithZeroDistance)
+ {
+ // Choose server withe the highest weight
+ bestRsId = highestWeightRsId;
+ }
+ return bestServers.get(bestRsId);
+ } else
+ {
+ // The local server is currently connected to a RS, let's see if it must
+ // disconnect or not, taking the weights into account.
+
+ float currentLoadDistance =
+ loadDistances.get(currentRsServerId).floatValue();
+ if (currentLoadDistance < (float) 0)
+ {
+ // Too much DSs connected to the current RS, compared with its load
+ // goal:
+ // Determine the potential number of DSs to disconnect from the current
+ // RS and see if the local DS is part of them: the DSs that must
+ // disconnect are those with the lowest server id.
+ // Compute the sum of the distances of the load goals of the other RSs
+ BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
+ for (Integer rsId : bestServers.keySet())
+ {
+ if (rsId != currentRsServerId)
+ {
+ sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
+ loadDistances.get(rsId), mathContext);
+ }
+ }
+
+ if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0)
+ {
+ // The average distance of the other RSs shows a lack of DSs.
+ // Compute the number of DSs to disconnect from the current RS,
+ // rounding to the nearest integer number. Do only this if there is
+ // no risk of yoyo effect: when the exact balance cannot be
+ // established due to the current number of DSs connected, do not
+ // disconnect a DS. A simple example where the balance cannot be
+ // reached is:
+ // - RS1 has weight 1 and 2 DSs
+ // - RS2 has weight 1 and 1 DS
+ // => disconnecting a DS from RS1 to reconnect it to RS2 would have no
+ // sense as this would lead to the reverse situation. In that case,
+ // the perfect balance cannot be reached and we must stick to the
+ // current situation, otherwise the DS would keep move between the 2
+ // RSs
+ float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
+ multiply(new BigDecimal(sumOfConnectedDSs), mathContext).
+ floatValue();
+ int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
+
+ // Avoid yoyo effect
+ if (overloadingDSsNumber == 1)
+ {
+ // What would be the new load distance for the current RS if
+ // we disconnect some DSs ?
+ ReplicationServerInfo currentReplicationServerInfo =
+ bestServers.get(currentRsServerId);
+
+ int currentRsWeight = currentReplicationServerInfo.getWeight();
+ BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight);
+ BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights);
+ BigDecimal currentRsLoadGoalBd =
+ currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
+ BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0);
+ if (sumOfConnectedDSs != 0)
+ {
+ int connectedDSs = currentReplicationServerInfo.
+ getConnectedDSNumber();
+ BigDecimal potentialNewConnectedDSsBd =
+ new BigDecimal(connectedDSs - 1);
+ BigDecimal sumOfConnectedDSsBd =
+ new BigDecimal(sumOfConnectedDSs);
+ potentialCurrentRsNewLoadBd =
+ potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
+ mathContext);
+ }
+ BigDecimal potentialCurrentRsNewLoadDistanceBd =
+ currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
+ mathContext);
+
+ // What would be the new load distance for the other RSs ?
+ BigDecimal additionalDsLoadBd =
+ (new BigDecimal(1)).divide(
+ new BigDecimal(sumOfConnectedDSs), mathContext);
+ BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
+ sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
+ mathContext);
+
+ // Now compare both values: we must no disconnect the DS if this
+ // is for going in a situation where the load distance of the other
+ // RSs is the opposite of the future load distance of the local RS
+ // or we would evaluate that we should disconnect just after being
+ // arrived on the new RS. But we should disconnect if we reach the
+ // perfect balance (both values are 0).
+ MathContext roundMc =
+ new MathContext(6, RoundingMode.DOWN);
+ BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
+ potentialCurrentRsNewLoadDistanceBd.round(roundMc);
+ BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
+ potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
+
+ if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
+ BigDecimal.ZERO) != 0)
+ && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
+ potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
+ {
+ // Avoid the yoyo effect, and keep the local DS connected to its
+ // current RS
+ return bestServers.get(currentRsServerId);
+ }
+ }
+
+ // Prepare a sorted list (from lowest to highest) or DS server ids
+ // connected to the current RS
+ ReplicationServerInfo currentRsInfo =
+ bestServers.get(currentRsServerId);
+ List<Integer> serversConnectedToCurrentRS =
+ currentRsInfo.getConnectedDSs();
+ List<Integer> sortedServers = new ArrayList<Integer>(
+ serversConnectedToCurrentRS);
+ Collections.sort(sortedServers);
+
+ // Go through the list of DSs to disconnect and see if the local
+ // server is part of them.
+ int index = 0;
+ while (overloadingDSsNumber > 0)
+ {
+ int severToDisconnectId = sortedServers.get(index);
+ if (severToDisconnectId == localServerId)
+ {
+ // The local server is part of the DSs to disconnect
+ return null;
+ }
+ overloadingDSsNumber--;
+ index++;
+ }
+
+ // The local server is not part of the servers to disconnect from the
+ // current RS.
+ return bestServers.get(currentRsServerId);
+ } else
+ {
+ // The average distance of the other RSs does not show a lack of DSs:
+ // no need to disconnect any DS from the current RS.
+ return bestServers.get(currentRsServerId);
+ }
+ } else
+ {
+ // The RS load goal is reached or there are not enough DSs connected to
+ // it to reach it: do not disconnect from this RS and return rsInfo for
+ // this RS
+ return bestServers.get(currentRsServerId);
+ }
+ }
}
/**
@@ -1679,28 +2191,6 @@
}
/**
- * Starts the same group id poller.
- */
- private void startSameGroupIdPoller()
- {
- sameGroupIdPoller = new SameGroupIdPoller();
- sameGroupIdPoller.start();
- }
-
- /**
- * Stops the same group id poller.
- */
- private synchronized void stopSameGroupIdPoller()
- {
- if (sameGroupIdPoller != null)
- {
- sameGroupIdPoller.shutdown();
- sameGroupIdPoller.waitForShutdown();
- sameGroupIdPoller = null;
- }
- }
-
- /**
* Stop the heartbeat monitor thread.
*/
synchronized void stopRSHeartBeatMonitoring()
@@ -1926,7 +2416,8 @@
* Receive a message.
* This method is not multithread safe and should either always be
* called in a single thread or protected by a locking mechanism
- * before being called.
+ * before being called. This is a wrapper to the method with a boolean version
+ * so that we do not have to modify existing tests.
*
* @return the received message
* @throws SocketTimeoutException if the timeout set by setSoTimeout
@@ -1934,6 +2425,26 @@
*/
public ReplicationMsg receive() throws SocketTimeoutException
{
+ return receive(false);
+ }
+
+ /**
+ * Receive a message.
+ * This method is not multithread safe and should either always be
+ * called in a single thread or protected by a locking mechanism
+ * before being called.
+ *
+ * @return the received message
+ * @throws SocketTimeoutException if the timeout set by setSoTimeout
+ * has expired
+ * @param allowReconnectionMechanism If true, this allows the reconnection
+ * mechanism to disconnect the broker if it detects that it should reconnect
+ * to another replication server because of some criteria defined by the
+ * algorithm where we choose a suitable replication server.
+ */
+ public ReplicationMsg receive(boolean allowReconnectionMechanism)
+ throws SocketTimeoutException
+ {
while (shutdown == false)
{
if (!connected)
@@ -1956,13 +2467,16 @@
{
WindowMsg windowMsg = (WindowMsg) msg;
sendWindow.release(windowMsg.getNumAck());
- }
- else if (msg instanceof TopologyMsg)
+ } else if (msg instanceof TopologyMsg)
{
- TopologyMsg topoMsg = (TopologyMsg)msg;
+ TopologyMsg topoMsg = (TopologyMsg) msg;
receiveTopo(topoMsg);
- }
- else if (msg instanceof StopMsg)
+ if (allowReconnectionMechanism)
+ {
+ // Reset wait time before next computation of best server
+ mustRunBestServerCheckingAlgorithm = 0;
+ }
+ } else if (msg instanceof StopMsg)
{
/*
* RS performs a proper disconnection
@@ -1974,8 +2488,7 @@
logError(message);
// Try to find a suitable RS
this.reStart(failingSession);
- }
- else if (msg instanceof MonitorMsg)
+ } else if (msg instanceof MonitorMsg)
{
// This is the response to a MonitorRequest that was sent earlier or
// the regular message of the monitoring publisher of the RS.
@@ -1997,16 +2510,53 @@
monitorResponse.notify();
}
- // Extract and store replication servers ServerStates
- rsStates = new HashMap<Integer, ServerState>();
+ // Update the replication servers ServerStates with new received info
it = monitorMsg.rsIterator();
while (it.hasNext())
{
int srvId = it.next();
- rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
+ ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
+ if (rsInfo != null)
+ {
+ rsInfo.update(monitorMsg.getRSServerState(srvId));
+ }
}
- }
- else
+
+ // Now if it is allowed, compute the best replication server to see if
+ // it is still the one we are currently connected to. If not,
+ // disconnect properly and let the connection algorithm re-connect to
+ // best replication server
+ if (allowReconnectionMechanism)
+ {
+ mustRunBestServerCheckingAlgorithm++;
+ if (mustRunBestServerCheckingAlgorithm == 2)
+ {
+ // Stable topology (no topo msg since few seconds): proceed with
+ // best server checking.
+ ReplicationServerInfo bestServerInfo =
+ computeBestReplicationServer(false, rsServerId, state,
+ replicationServerInfos, serverId, baseDn, groupId,
+ generationID);
+
+ if ((bestServerInfo == null) ||
+ (bestServerInfo.getServerId() != rsServerId))
+ {
+ // The best replication server is no more the one we are
+ // currently using. Disconnect properly then reconnect.
+ Message message =
+ NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
+ Integer.toString(serverId),
+ Integer.toString(rsServerId),
+ rsServerUrl);
+ logError(message);
+ reStart();
+ }
+
+ // Reset wait time before next computation of best server
+ mustRunBestServerCheckingAlgorithm = 0;
+ }
+ }
+ } else
{
return msg;
}
@@ -2018,15 +2568,14 @@
if (shutdown == false)
{
if ((session == null) || (!session.closeInitiated()))
-
{
/*
* We did not initiate the close on our side, log an error message.
*/
Message message =
ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
- Integer.toString(rsServerId), baseDn.toString(),
- Integer.toString(serverId));
+ Integer.toString(rsServerId), baseDn.toString(),
+ Integer.toString(serverId));
logError(message);
}
this.reStart(failingSession);
@@ -2066,7 +2615,8 @@
}
}
} catch (InterruptedException e)
- {}
+ {
+ }
return replicaStates;
}
@@ -2081,7 +2631,7 @@
{
try
{
- updateDoneCount ++;
+ updateDoneCount++;
if ((updateDoneCount >= halfRcvWindow) && (session != null))
{
session.publish(new WindowMsg(updateDoneCount));
@@ -2103,10 +2653,9 @@
if (debugEnabled())
{
debugInfo("ReplicationBroker " + serverId + " is stopping and will" +
- " close the connection to replication server " + rsServerId + " for"
- + " domain " + baseDn);
+ " close the connection to replication server " + rsServerId + " for" +
+ " domain " + baseDn);
}
- stopSameGroupIdPoller();
stopRSHeartBeatMonitoring();
stopChangeTimeHeartBeatPublishing();
replicationServer = "stopped";
@@ -2237,8 +2786,8 @@
* @param groupId The new group id to use
*/
public boolean changeConfig(
- Collection<String> replicationServers, int window, long heartbeatInterval,
- byte groupId)
+ Collection<String> replicationServers, int window, long heartbeatInterval,
+ byte groupId)
{
// These parameters needs to be renegotiated with the ReplicationServer
// so if they have changed, that requires restarting the session with
@@ -2248,11 +2797,11 @@
// A new session is necessary only when information regarding
// the connection is modified
if ((servers == null) ||
- (!(replicationServers.size() == servers.size()
- && replicationServers.containsAll(servers))) ||
- window != this.maxRcvWindow ||
- heartbeatInterval != this.heartbeatInterval ||
- (groupId != this.groupId))
+ (!(replicationServers.size() == servers.size() && replicationServers.
+ containsAll(servers))) ||
+ window != this.maxRcvWindow ||
+ heartbeatInterval != this.heartbeatInterval ||
+ (groupId != this.groupId))
{
needToRestartSession = true;
}
@@ -2313,152 +2862,6 @@
}
/**
- * In case we are connected to a RS with a different group id, we use this
- * thread to poll presence of a RS with the same group id as ours. If a RS
- * with the same group id is available, we close the session to force
- * reconnection. Reconnection will choose a server with the same group id.
- */
- private class SameGroupIdPoller extends DirectoryThread
- {
-
- private boolean sameGroupIdPollershutdown = false;
- private boolean terminated = false;
- // Sleep interval in ms
- private static final int SAME_GROUP_ID_POLLER_PERIOD = 5000;
-
- public SameGroupIdPoller()
- {
- super("Replication Broker Same Group Id Poller for " + baseDn.toString() +
- " and group id " + groupId + " in server id " + serverId);
- }
-
- /**
- * Wait for the completion of the same group id poller.
- */
- public void waitForShutdown()
- {
- try
- {
- while (terminated == false)
- {
- Thread.sleep(50);
- }
- } catch (InterruptedException e)
- {
- // exit the loop if this thread is interrupted.
- }
- }
-
- /**
- * Shutdown the same group id poller.
- */
- public void shutdown()
- {
- sameGroupIdPollershutdown = true;
- }
-
- /**
- * Permanently look for RS with our group id and if found, break current
- * connection to force reconnection to a new server with the right group id.
- */
- @Override
- public void run()
- {
- boolean done = false;
-
- if (debugEnabled())
- {
- TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
- " started.");
- }
-
- while ((!done) && (!sameGroupIdPollershutdown))
- {
- // Sleep some time between checks
- try
- {
- Thread.sleep(SAME_GROUP_ID_POLLER_PERIOD);
- } catch (InterruptedException e)
- {
- // Stop as we are interrupted
- sameGroupIdPollershutdown = true;
- }
- synchronized (connectPhaseLock)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("Running SameGroupIdPoller for: " +
- baseDn.toString());
- }
- if (session != null) // Check only if not already disconnected
-
- {
- for (String server : servers)
- {
- // Do not ask the RS we are connected to as it has for sure the
- // wrong group id
- if (server.equals(rsServerUrl))
- continue;
-
- // Connect to server and get reply message
- ServerInfo serverInfo =
- performPhaseOneHandshake(server, false);
-
- // Is it a server with our group id ?
- if (serverInfo != null)
- {
- if (groupId == serverInfo.getGroupId())
- {
- // Found one server with the same group id as us, disconnect
- // session to force reconnection to a server with same group
- // id.
- Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get(
- Byte.toString(groupId), baseDn.toString(),
- Integer.toString(serverId));
- logError(message);
-
- if (protocolVersion >=
- ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // V4 protocol introduces a StopMsg to properly end
- // communications
- try
- {
- session.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
- }
- try
- {
- session.close();
- } catch (Exception e)
- {
- // The session was already closed, just ignore.
- }
- session = null;
- done = true; // Terminates thread as did its job.
-
- break;
- }
- }
- } // for server
-
- }
- }
- }
-
- terminated = true;
- if (debugEnabled())
- {
- TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
- " terminated.");
- }
- }
- }
-
- /**
* Signals the RS we just entered a new status.
* @param newStatus The status the local DS just entered
*/
@@ -2505,7 +2908,42 @@
*/
public List<RSInfo> getRsList()
{
- return rsList;
+ List<RSInfo> result = new ArrayList<RSInfo>();
+
+ for (ReplicationServerInfo replicationServerInfo :
+ replicationServerInfos.values())
+ {
+ result.add(replicationServerInfo.toRSInfo());
+ }
+ return result;
+ }
+
+ /**
+ * Computes the list of DSs connected to a particular RS.
+ * @param rsId The RS id of the server one wants to know the connected DSs
+ * @param dsList The list of DSinfo from which to compute things
+ * @return The list of connected DSs to the server rsId
+ */
+ private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList)
+ {
+ List<Integer> connectedDSs = new ArrayList<Integer>();
+
+ if (rsServerId == rsId)
+ {
+ // If we are computing connected DSs for the RS we are connected
+ // to, we should count the local DS as the DSInfo of the local DS is not
+ // sent by the replication server in the topology message. We must count
+ // ourself as a connected server.
+ connectedDSs.add(serverId);
+ }
+
+ for (DSInfo dsInfo : dsList)
+ {
+ if (dsInfo.getRsId() == rsId)
+ connectedDSs.add(dsInfo.getDsId());
+ }
+
+ return connectedDSs;
}
/**
@@ -2516,13 +2954,49 @@
*/
public void receiveTopo(TopologyMsg topoMsg)
{
- // Store new lists
- synchronized(getDsList())
+ // Store new DS list
+ dsList = topoMsg.getDsList();
+
+ // Update replication server info list with the received topology
+ // information
+ List<Integer> rsToKeepList = new ArrayList<Integer>();
+ for (RSInfo rsInfo : topoMsg.getRsList())
{
- synchronized(getRsList())
+ int rsId = rsInfo.getId();
+ rsToKeepList.add(rsId); // Mark this server as still existing
+ List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
+ ReplicationServerInfo replicationServerInfo =
+ replicationServerInfos.get(rsId);
+ if (replicationServerInfo == null)
{
- dsList = topoMsg.getDsList();
- rsList = topoMsg.getRsList();
+ // New replication server, create info for it add it to the list
+ replicationServerInfo =
+ new ReplicationServerInfo(rsInfo, connectedDSs);
+ // Set the locally configured flag for this new RS only if it is
+ // configured
+ updateRSInfoLocallyConfiguredStatus(replicationServerInfo);
+ replicationServerInfos.put(rsId, replicationServerInfo);
+ } else
+ {
+ // Update the existing info for the replication server
+ replicationServerInfo.update(rsInfo, connectedDSs);
+ }
+ }
+
+ /**
+ * Now remove any replication server that may have disappeared from the
+ * topology.
+ */
+ Iterator<Entry<Integer, ReplicationServerInfo>> rsInfoIt =
+ replicationServerInfos.entrySet().iterator();
+ while (rsInfoIt.hasNext())
+ {
+ Entry<Integer, ReplicationServerInfo> rsInfoEntry = rsInfoIt.next();
+ if (!rsToKeepList.contains(rsInfoEntry.getKey()))
+ {
+ // This replication server has quit the topology, remove it from the
+ // list
+ rsInfoIt.remove();
}
}
if (domain != null)
@@ -2536,6 +3010,7 @@
}
}
}
+
/**
* Check if the broker could not find any Replication Server and therefore
* connection attempt failed.
@@ -2557,16 +3032,15 @@
{
ctHeartbeatPublisherThread =
new CTHeartbeatPublisherThread(
- "Replication CN Heartbeat sender for " +
- baseDn + " with " + getReplicationServer(),
- session, changeTimeHeartbeatSendInterval, serverId);
+ "Replication CN Heartbeat sender for " +
+ baseDn + " with " + getReplicationServer(),
+ session, changeTimeHeartbeatSendInterval, serverId);
ctHeartbeatPublisherThread.start();
- }
- else
+ } else
{
if (debugEnabled())
TRACER.debugInfo(this +
- " is not configured to send CN heartbeat interval");
+ " is not configured to send CN heartbeat interval");
}
}
--
Gitblit v1.10.0