From 950cfbfa7d895b728f432500027df274d6b1d6c7 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 05 Mar 2013 16:50:35 +0000
Subject: [PATCH] OPENDJ-66 (CR-1365) DS does not failover between replication servers in different groups when configured explicitly for one of the groups
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 277 ++++++++++++++++++++++++++-----------------------------
1 files changed, 131 insertions(+), 146 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 1ac917a..487544e 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -23,17 +23,14 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.service;
-import java.net.UnknownHostException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-import static org.opends.server.util.StaticUtils.collectionToString;
-import static org.opends.server.util.StaticUtils.isLocalAddress;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
import java.math.BigDecimal;
@@ -45,6 +42,7 @@
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -70,9 +68,9 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
-import org.opends.server.replication.server.ReplicationServer;
/**
* The broker for Multi-master Replication.
@@ -86,7 +84,10 @@
private static final DebugTracer TRACER = getTracer();
private volatile boolean shutdown = false;
private final Object startStopLock = new Object();
- private volatile Collection<String> servers;
+ /**
+ * Replication server URLs under this format: "<code>hostname:port</code>".
+ */
+ private volatile Collection<String> replicationServerUrls;
private volatile boolean connected = false;
private volatile String replicationServer = "Not connected";
private volatile ProtocolSession session = null;
@@ -101,15 +102,15 @@
private int timeout = 0;
private short protocolVersion;
private ReplSessionSecurity replSessionSecurity;
- // My group id
- private byte groupId = (byte) -1;
- // The group id of the RS we are connected to
- private byte rsGroupId = (byte) -1;
- // The server id of the RS we are connected to
+ /** My group id. */
+ private byte groupId = -1;
+ /** The group id of the RS we are connected to. */
+ private byte rsGroupId = -1;
+ /** The server id of the RS we are connected to. */
private Integer rsServerId = -1;
- // The server URL of the RS we are connected to
+ /** The server URL of the RS we are connected to. */
private String rsServerUrl = null;
- // Our replication domain
+ /** Our replication domain. */
private ReplicationDomain domain = null;
/**
* This object is used as a conditional event to be notified about
@@ -182,7 +183,7 @@
* received, it is incremented. When it reaches 2, we run the checking
* algorithm to see if we must reconnect to another best replication server.
* Then we reset the value to 0. But when a topology message is received, the
- * integer is reseted to 0. This insures that we wait at least one monitoring
+ * integer is reseted to 0. This ensures that we wait at least one monitoring
* publisher period before running the algorithm, but also that we wait at
* least for a monitoring period after the last received topology message
* (topology stabilization).
@@ -247,19 +248,17 @@
/**
* Start the ReplicationBroker.
*
- * @param servers list of servers used
+ * @param replicationServers list of servers used
*/
- public void start(Collection<String> servers)
+ public void start(Collection<String> replicationServers)
{
synchronized (startStopLock)
{
- /*
- * Open Socket to the ReplicationServer Send the Start message
- */
+ // Open Socket to the ReplicationServer Send the Start message
shutdown = false;
- this.servers = servers;
+ this.replicationServerUrls = replicationServers;
- if (servers.size() < 1)
+ if (this.replicationServerUrls.size() < 1)
{
Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
logError(message);
@@ -348,7 +347,7 @@
replicationServerInfo.setLocallyConfigured(false);
return;
}
- for (String serverUrl : servers)
+ for (String serverUrl : replicationServerUrls)
{
if (isSameReplicationServerUrl(serverUrl, rsUrl))
{
@@ -428,12 +427,10 @@
}
// Now compare addresses, if at least one match, this is the same server
- for (int i = 0; i < rs1Addresses.length; i++)
+ for (InetAddress inetAddress1 : rs1Addresses)
{
- InetAddress inetAddress1 = rs1Addresses[i];
- for (int j = 0; j < rs2Addresses.length; j++)
+ for (InetAddress inetAddress2 : rs2Addresses)
{
- InetAddress inetAddress2 = rs2Addresses[j];
if (inetAddress2.equals(inetAddress1))
{
return true;
@@ -454,7 +451,7 @@
{
private short protocolVersion;
private long generationId;
- private byte groupId = (byte) -1;
+ private byte groupId = -1;
private int serverId;
// Received server URL
private String serverURL;
@@ -517,8 +514,11 @@
}
/**
- * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg.
- * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
+ * Constructs a ReplicationServerInfo object wrapping a
+ * {@link ReplServerStartMsg}.
+ *
+ * @param replServerStartMsg
+ * The {@link ReplServerStartMsg} this object will wrap.
*/
private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
{
@@ -537,9 +537,10 @@
/**
* Constructs a ReplicationServerInfo object wrapping a
- * ReplServerStartDSMsg.
- * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
- * wrap.
+ * {@link ReplServerStartDSMsg}.
+ *
+ * @param replServerStartDSMsg
+ * The {@link ReplServerStartDSMsg} this object will wrap.
*/
private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
{
@@ -756,9 +757,11 @@
* Returns a string representation of this object.
* @return A string representation of this object.
*/
+ @Override
public String toString()
{
- return "Url:"+ this.getServerURL() + " ServerId:" + this.serverId;
+ return "Url:" + this.serverURL + " ServerId:" + this.serverId
+ + " GroupId:" + this.groupId;
}
}
@@ -781,15 +784,14 @@
*/
private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
{
-
Map<Integer, ReplicationServerInfo> rsInfos =
new ConcurrentHashMap<Integer, ReplicationServerInfo>();
- for (String server : servers)
+ for (String serverUrl : replicationServerUrls)
{
// Connect to server and get info about it
ReplicationServerInfo replicationServerInfo =
- performPhaseOneHandshake(server, false, false);
+ performPhaseOneHandshake(serverUrl, false, false);
// Store server info in list
if (replicationServerInfo != null)
@@ -802,20 +804,27 @@
}
/**
- * Special aspects of connecting as ECL compared to connecting as data server
- * are :
- * - 1 single RS configured
- * - so no choice of the preferred RS
- * - ?? Heartbeat
- * - Start handshake is :
+ * Special aspects of connecting as ECL (External Change Log) compared to
+ * connecting as data server are :
+ * <ul>
+ * <li>1 single RS configured</li>
+ * <li>so no choice of the preferred RS</li>
+ * <li>?? Heartbeat</li>
+ * <li>Start handshake is :
+ *
+ * <pre>
* Broker ---> StartECLMsg ---> RS
* <---- ReplServerStartMsg ---
* ---> StartSessionECLMsg --> RS
+ * </pre>
+ *
+ * </li>
+ * </ul>
*/
private void connectAsECL()
{
// FIXME:ECL List of RS to connect is for now limited to one RS only
- String bestServer = this.servers.iterator().next();
+ String bestServer = this.replicationServerUrls.iterator().next();
if (performPhaseOneHandshake(bestServer, true, true) != null)
{
@@ -1323,17 +1332,7 @@
localSession.close();
}
- if (socket != null)
- {
- try
- {
- socket.close();
- }
- catch (IOException e)
- {
- // Ignore.
- }
- }
+ close(socket);
}
if (!hasConnected && errorMessage != null)
@@ -1361,9 +1360,9 @@
/**
- * Performs the second phase handshake (send StartSessionMsg and receive
- * TopologyMsg messages exchange) and return the reply message from the
- * replication server.
+ * Performs the second phase handshake for External Change Log (send
+ * StartSessionMsg and receive TopologyMsg messages exchange) and return the
+ * reply message from the replication server.
*
* @param server Server we are connecting with.
* @param initStatus The status we are starting with
@@ -1535,58 +1534,34 @@
* - replication server in the same VM as local DS one
*/
Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
- Map<Integer, ReplicationServerInfo> newBestServers;
// The list of best replication servers is filtered with each criteria. At
- // each criteria, the list is replaced with the filtered one if some there
+ // each criteria, the list is replaced with the filtered one if there
// are some servers from the filtering, otherwise, the list is left as is
// and the new filtering for the next criteria is applied and so on.
- for (int filterLevel = 1; filterLevel <= 4; filterLevel++)
+
+
+ // Use only servers locally configured: those are servers declared in
+ // the local configuration. When the current method is called, for
+ // sure, at least one server from the list is locally configured
+ bestServers =
+ keepBest(filterServersLocallyConfigured(bestServers), bestServers);
+ // Some servers with same group id ?
+ bestServers =
+ keepBest(filterServersWithSameGroupId(bestServers, groupId),
+ bestServers);
+ // Some servers with same generation id ?
+ Map<Integer, ReplicationServerInfo> sameGenerationId =
+ filterServersWithSameGenerationId(bestServers, generationId);
+ if (sameGenerationId.size() > 0)
{
- newBestServers = null;
- switch (filterLevel)
- {
- case 1:
- // Use only servers locally configured: those are servers declared in
- // the local configuration. When the current method is called, for
- // sure, at least one server from the list is locally configured
- bestServers = filterServersLocallyConfigured(bestServers);
- break;
- case 2:
- // Some servers with same group id ?
- newBestServers = filterServersWithSameGroupId(bestServers, groupId);
- if (newBestServers.size() > 0)
- {
- bestServers = newBestServers;
- }
- break;
- case 3:
- // Some servers with same generation id ?
- newBestServers = filterServersWithSameGenerationId(bestServers,
- generationId);
- if (newBestServers.size() > 0)
- {
- // Ok some servers with the right generation id
- bestServers = newBestServers;
- // If some servers with the right generation id this is useful to
- // run the local DS change criteria
- newBestServers = filterServersWithAllLocalDSChanges(bestServers,
- myState, localServerId);
- if (newBestServers.size() > 0)
- {
- bestServers = newBestServers;
- }
- }
- break;
- case 4:
- // Some servers in the local VM ?
- newBestServers = filterServersInSameVM(bestServers);
- if (newBestServers.size() > 0)
- {
- bestServers = newBestServers;
- }
- break;
- }
+ // If some servers with the right generation id this is useful to
+ // run the local DS change criteria
+ bestServers =
+ keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
+ myState, localServerId), sameGenerationId);
}
+ // Some servers in the local VM ?
+ bestServers = keepBest(filterServersInSameVM(bestServers), bestServers);
/**
* Now apply the choice base on the weight to the best servers list
@@ -1612,6 +1587,23 @@
}
/**
+ * If the filtered Map is not empty then it is returned, else return the
+ * original unfiltered Map.
+ *
+ * @return the best fit Map between the filtered Map and the original
+ * unfiltered Map.
+ */
+ private static <K, V> Map<K, V> keepBest(Map<K, V> filteredMap,
+ Map<K, V> unfilteredMap)
+ {
+ if (!filteredMap.isEmpty())
+ {
+ return filteredMap;
+ }
+ return unfilteredMap;
+ }
+
+ /**
* Creates a new list that contains only replication servers that are locally
* configured.
* @param bestServers The list of replication servers to filter
@@ -1867,24 +1859,22 @@
// key:server id, value: distance
Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
// Precision for the operations (number of digits after the dot)
- // Default value of rounding method is HALF_UP for
- // the MathContext
- MathContext mathContext = new MathContext(32);
+ MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
int rsWeight = replicationServerInfo.getWeight();
// load goal = rs weight / sum of weights
- BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide(
- new BigDecimal(sumOfWeights), mathContext);
+ BigDecimal loadGoalBd = BigDecimal.valueOf(rsWeight).divide(
+ BigDecimal.valueOf(sumOfWeights), mathContext);
BigDecimal currentLoadBd = BigDecimal.ZERO;
if (sumOfConnectedDSs != 0)
{
// current load = number of connected DSs / total number of DSs
int connectedDSs = replicationServerInfo.getConnectedDSNumber();
- currentLoadBd = (new BigDecimal(connectedDSs)).divide(
- new BigDecimal(sumOfConnectedDSs), mathContext);
+ currentLoadBd = BigDecimal.valueOf(connectedDSs).divide(
+ BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
}
// load distance = load goal - current load
BigDecimal loadDistanceBd =
@@ -1916,7 +1906,7 @@
bestRsId = rsId;
highestDistance = loadDistance;
}
- if (loadDistance != (float)0)
+ if (loadDistance != 0)
{
allRsWithZeroDistance = false;
}
@@ -1931,7 +1921,7 @@
// All servers with a 0 distance ?
if (allRsWithZeroDistance)
{
- // Choose server withe the highest weight
+ // Choose server with the highest weight
bestRsId = highestWeightRsId;
}
return bestServers.get(bestRsId);
@@ -1942,7 +1932,7 @@
float currentLoadDistance =
loadDistances.get(currentRsServerId).floatValue();
- if (currentLoadDistance < (float) 0)
+ if (currentLoadDistance < 0)
{
// Too much DSs connected to the current RS, compared with its load
// goal:
@@ -1960,7 +1950,7 @@
}
}
- if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0)
+ if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
{
// The average distance of the other RSs shows a lack of DSs.
// Compute the number of DSs to disconnect from the current RS,
@@ -1977,8 +1967,8 @@
// current situation, otherwise the DS would keep move between the 2
// RSs
float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
- multiply(new BigDecimal(sumOfConnectedDSs), mathContext).
- floatValue();
+ multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
+ .floatValue();
int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
// Avoid yoyo effect
@@ -1990,34 +1980,34 @@
bestServers.get(currentRsServerId);
int currentRsWeight = currentReplicationServerInfo.getWeight();
- BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight);
- BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights);
+ BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
+ BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
BigDecimal currentRsLoadGoalBd =
currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
- BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0);
+ BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
if (sumOfConnectedDSs != 0)
{
int connectedDSs = currentReplicationServerInfo.
getConnectedDSNumber();
BigDecimal potentialNewConnectedDSsBd =
- new BigDecimal(connectedDSs - 1);
+ BigDecimal.valueOf(connectedDSs - 1);
BigDecimal sumOfConnectedDSsBd =
- new BigDecimal(sumOfConnectedDSs);
+ BigDecimal.valueOf(sumOfConnectedDSs);
potentialCurrentRsNewLoadBd =
potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
- mathContext);
+ mathContext);
}
BigDecimal potentialCurrentRsNewLoadDistanceBd =
currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
- mathContext);
+ mathContext);
// What would be the new load distance for the other RSs ?
BigDecimal additionalDsLoadBd =
- (new BigDecimal(1)).divide(
- new BigDecimal(sumOfConnectedDSs), mathContext);
+ BigDecimal.ONE.divide(
+ BigDecimal.valueOf(sumOfConnectedDSs),mathContext);
BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
- mathContext);
+ mathContext);
// Now compare both values: we must no disconnect the DS if this
// is for going in a situation where the load distance of the other
@@ -2139,7 +2129,7 @@
if (failingSession == session)
{
connected = false;
- rsGroupId = (byte) -1;
+ rsGroupId = -1;
rsServerId = -1;
rsServerUrl = null;
session = null;
@@ -2291,8 +2281,7 @@
// connectPhaseLock because it can be blocking and we don't
// want to hold off reconnection in case the connection dropped.
credit =
- currentWindowSemaphore.tryAcquire(
- (long) 500, TimeUnit.MILLISECONDS);
+ currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
} else
{
credit = true;
@@ -2466,14 +2455,12 @@
{
// This is the response to a MonitorRequest that was sent earlier or
// the regular message of the monitoring publisher of the RS.
+ MonitorMsg monitorMsg = (MonitorMsg) msg;
// Extract and store replicas ServerStates
replicaStates = new HashMap<Integer, ServerState>();
- MonitorMsg monitorMsg = (MonitorMsg) msg;
- Iterator<Integer> it = monitorMsg.ldapIterator();
- while (it.hasNext())
+ for (int srvId : toIterable(monitorMsg.ldapIterator()))
{
- int srvId = it.next();
replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
}
@@ -2485,10 +2472,8 @@
}
// Update the replication servers ServerStates with new received info
- it = monitorMsg.rsIterator();
- while (it.hasNext())
+ for (int srvId : toIterable(monitorMsg.rsIterator()))
{
- int srvId = it.next();
ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
if (rsInfo != null)
{
@@ -2662,7 +2647,7 @@
stopRSHeartBeatMonitoring();
stopChangeTimeHeartBeatPublishing();
replicationServer = "stopped";
- rsGroupId = (byte) -1;
+ rsGroupId = -1;
rsServerId = -1;
rsServerUrl = null;
if (session != null)
@@ -2781,17 +2766,17 @@
// A new session is necessary only when information regarding
// the connection is modified
- if ((servers == null) ||
- (!(replicationServers.size() == servers.size() && replicationServers.
- containsAll(servers))) ||
- window != this.maxRcvWindow ||
- heartbeatInterval != this.heartbeatInterval ||
- (groupId != this.groupId))
+ if (this.replicationServerUrls == null
+ || replicationServers.size() != this.replicationServerUrls.size()
+ || !replicationServers.containsAll(this.replicationServerUrls)
+ || window != this.maxRcvWindow
+ || heartbeatInterval != this.heartbeatInterval
+ || groupId != this.groupId)
{
needToRestartSession = true;
}
- this.servers = replicationServers;
+ this.replicationServerUrls = replicationServers;
this.rcvWindow = window;
this.maxRcvWindow = window;
this.halfRcvWindow = window / 2;
--
Gitblit v1.10.0