| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2012 ForgeRock AS |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | | import java.net.UnknownHostException; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.StaticUtils.collectionToString; |
| | | import static org.opends.server.util.StaticUtils.isLocalAddress; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.math.BigDecimal; |
| | |
| | | import java.net.Socket; |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.net.UnknownHostException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Collections; |
| | |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | |
| | | /** |
| | | * The broker for Multi-master Replication. |
| | |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | private volatile boolean shutdown = false; |
| | | private final Object startStopLock = new Object(); |
| | | private volatile Collection<String> servers; |
| | | /** |
| | | * Replication server URLs under this format: "<code>hostname:port</code>". |
| | | */ |
| | | private volatile Collection<String> replicationServerUrls; |
| | | private volatile boolean connected = false; |
| | | private volatile String replicationServer = "Not connected"; |
| | | private volatile ProtocolSession session = null; |
| | |
| | | private int timeout = 0; |
| | | private short protocolVersion; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | // My group id |
| | | private byte groupId = (byte) -1; |
| | | // The group id of the RS we are connected to |
| | | private byte rsGroupId = (byte) -1; |
| | | // The server id of the RS we are connected to |
| | | /** My group id. */ |
| | | private byte groupId = -1; |
| | | /** The group id of the RS we are connected to. */ |
| | | private byte rsGroupId = -1; |
| | | /** The server id of the RS we are connected to. */ |
| | | private Integer rsServerId = -1; |
| | | // The server URL of the RS we are connected to |
| | | /** The server URL of the RS we are connected to. */ |
| | | private String rsServerUrl = null; |
| | | // Our replication domain |
| | | /** Our replication domain. */ |
| | | private ReplicationDomain domain = null; |
| | | /** |
| | | * This object is used as a conditional event to be notified about |
| | |
| | | * received, it is incremented. When it reaches 2, we run the checking |
| | | * algorithm to see if we must reconnect to another best replication server. |
| | | * Then we reset the value to 0. But when a topology message is received, the |
| | | * integer is reseted to 0. This insures that we wait at least one monitoring |
| | | * integer is reseted to 0. This ensures that we wait at least one monitoring |
| | | * publisher period before running the algorithm, but also that we wait at |
| | | * least for a monitoring period after the last received topology message |
| | | * (topology stabilization). |
| | |
| | | /** |
| | | * Start the ReplicationBroker. |
| | | * |
| | | * @param servers list of servers used |
| | | * @param replicationServers list of servers used |
| | | */ |
| | | public void start(Collection<String> servers) |
| | | public void start(Collection<String> replicationServers) |
| | | { |
| | | synchronized (startStopLock) |
| | | { |
| | | /* |
| | | * Open Socket to the ReplicationServer Send the Start message |
| | | */ |
| | | // Open Socket to the ReplicationServer Send the Start message |
| | | shutdown = false; |
| | | this.servers = servers; |
| | | this.replicationServerUrls = replicationServers; |
| | | |
| | | if (servers.size() < 1) |
| | | if (this.replicationServerUrls.size() < 1) |
| | | { |
| | | Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get(); |
| | | logError(message); |
| | |
| | | replicationServerInfo.setLocallyConfigured(false); |
| | | return; |
| | | } |
| | | for (String serverUrl : servers) |
| | | for (String serverUrl : replicationServerUrls) |
| | | { |
| | | if (isSameReplicationServerUrl(serverUrl, rsUrl)) |
| | | { |
| | |
| | | } |
| | | |
| | | // Now compare addresses, if at least one match, this is the same server |
| | | for (int i = 0; i < rs1Addresses.length; i++) |
| | | for (InetAddress inetAddress1 : rs1Addresses) |
| | | { |
| | | InetAddress inetAddress1 = rs1Addresses[i]; |
| | | for (int j = 0; j < rs2Addresses.length; j++) |
| | | for (InetAddress inetAddress2 : rs2Addresses) |
| | | { |
| | | InetAddress inetAddress2 = rs2Addresses[j]; |
| | | if (inetAddress2.equals(inetAddress1)) |
| | | { |
| | | return true; |
| | |
| | | { |
| | | private short protocolVersion; |
| | | private long generationId; |
| | | private byte groupId = (byte) -1; |
| | | private byte groupId = -1; |
| | | private int serverId; |
| | | // Received server URL |
| | | private String serverURL; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg. |
| | | * @param replServerStartMsg The ReplServerStartMsg this object will wrap. |
| | | * Constructs a ReplicationServerInfo object wrapping a |
| | | * {@link ReplServerStartMsg}. |
| | | * |
| | | * @param replServerStartMsg |
| | | * The {@link ReplServerStartMsg} this object will wrap. |
| | | */ |
| | | private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg) |
| | | { |
| | |
| | | |
| | | /** |
| | | * Constructs a ReplicationServerInfo object wrapping a |
| | | * ReplServerStartDSMsg. |
| | | * @param replServerStartDSMsg The ReplServerStartDSMsg this object will |
| | | * wrap. |
| | | * {@link ReplServerStartDSMsg}. |
| | | * |
| | | * @param replServerStartDSMsg |
| | | * The {@link ReplServerStartDSMsg} this object will wrap. |
| | | */ |
| | | private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg) |
| | | { |
| | |
| | | * Returns a string representation of this object. |
| | | * @return A string representation of this object. |
| | | */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "Url:"+ this.getServerURL() + " ServerId:" + this.serverId; |
| | | return "Url:" + this.serverURL + " ServerId:" + this.serverId |
| | | + " GroupId:" + this.groupId; |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo() |
| | | { |
| | | |
| | | Map<Integer, ReplicationServerInfo> rsInfos = |
| | | new ConcurrentHashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (String server : servers) |
| | | for (String serverUrl : replicationServerUrls) |
| | | { |
| | | // Connect to server and get info about it |
| | | ReplicationServerInfo replicationServerInfo = |
| | | performPhaseOneHandshake(server, false, false); |
| | | performPhaseOneHandshake(serverUrl, false, false); |
| | | |
| | | // Store server info in list |
| | | if (replicationServerInfo != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Special aspects of connecting as ECL compared to connecting as data server |
| | | * are : |
| | | * - 1 single RS configured |
| | | * - so no choice of the preferred RS |
| | | * - ?? Heartbeat |
| | | * - Start handshake is : |
| | | * Special aspects of connecting as ECL (External Change Log) compared to |
| | | * connecting as data server are : |
| | | * <ul> |
| | | * <li>1 single RS configured</li> |
| | | * <li>so no choice of the preferred RS</li> |
| | | * <li>?? Heartbeat</li> |
| | | * <li>Start handshake is : |
| | | * |
| | | * <pre> |
| | | * Broker ---> StartECLMsg ---> RS |
| | | * <---- ReplServerStartMsg --- |
| | | * ---> StartSessionECLMsg --> RS |
| | | * </pre> |
| | | * |
| | | * </li> |
| | | * </ul> |
| | | */ |
| | | private void connectAsECL() |
| | | { |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | String bestServer = this.servers.iterator().next(); |
| | | String bestServer = this.replicationServerUrls.iterator().next(); |
| | | |
| | | if (performPhaseOneHandshake(bestServer, true, true) != null) |
| | | { |
| | |
| | | localSession.close(); |
| | | } |
| | | |
| | | if (socket != null) |
| | | { |
| | | try |
| | | { |
| | | socket.close(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // Ignore. |
| | | } |
| | | } |
| | | close(socket); |
| | | } |
| | | |
| | | if (!hasConnected && errorMessage != null) |
| | |
| | | |
| | | |
| | | /** |
| | | * Performs the second phase handshake (send StartSessionMsg and receive |
| | | * TopologyMsg messages exchange) and return the reply message from the |
| | | * replication server. |
| | | * Performs the second phase handshake for External Change Log (send |
| | | * StartSessionMsg and receive TopologyMsg messages exchange) and return the |
| | | * reply message from the replication server. |
| | | * |
| | | * @param server Server we are connecting with. |
| | | * @param initStatus The status we are starting with |
| | |
| | | * - replication server in the same VM as local DS one |
| | | */ |
| | | Map<Integer, ReplicationServerInfo> bestServers = rsInfos; |
| | | Map<Integer, ReplicationServerInfo> newBestServers; |
| | | // The list of best replication servers is filtered with each criteria. At |
| | | // each criteria, the list is replaced with the filtered one if some there |
| | | // each criteria, the list is replaced with the filtered one if there |
| | | // are some servers from the filtering, otherwise, the list is left as is |
| | | // and the new filtering for the next criteria is applied and so on. |
| | | for (int filterLevel = 1; filterLevel <= 4; filterLevel++) |
| | | |
| | | |
| | | // Use only servers locally configured: those are servers declared in |
| | | // the local configuration. When the current method is called, for |
| | | // sure, at least one server from the list is locally configured |
| | | bestServers = |
| | | keepBest(filterServersLocallyConfigured(bestServers), bestServers); |
| | | // Some servers with same group id ? |
| | | bestServers = |
| | | keepBest(filterServersWithSameGroupId(bestServers, groupId), |
| | | bestServers); |
| | | // Some servers with same generation id ? |
| | | Map<Integer, ReplicationServerInfo> sameGenerationId = |
| | | filterServersWithSameGenerationId(bestServers, generationId); |
| | | if (sameGenerationId.size() > 0) |
| | | { |
| | | newBestServers = null; |
| | | switch (filterLevel) |
| | | { |
| | | case 1: |
| | | // Use only servers locally configured: those are servers declared in |
| | | // the local configuration. When the current method is called, for |
| | | // sure, at least one server from the list is locally configured |
| | | bestServers = filterServersLocallyConfigured(bestServers); |
| | | break; |
| | | case 2: |
| | | // Some servers with same group id ? |
| | | newBestServers = filterServersWithSameGroupId(bestServers, groupId); |
| | | if (newBestServers.size() > 0) |
| | | { |
| | | bestServers = newBestServers; |
| | | } |
| | | break; |
| | | case 3: |
| | | // Some servers with same generation id ? |
| | | newBestServers = filterServersWithSameGenerationId(bestServers, |
| | | generationId); |
| | | if (newBestServers.size() > 0) |
| | | { |
| | | // Ok some servers with the right generation id |
| | | bestServers = newBestServers; |
| | | // If some servers with the right generation id this is useful to |
| | | // run the local DS change criteria |
| | | newBestServers = filterServersWithAllLocalDSChanges(bestServers, |
| | | myState, localServerId); |
| | | if (newBestServers.size() > 0) |
| | | { |
| | | bestServers = newBestServers; |
| | | } |
| | | } |
| | | break; |
| | | case 4: |
| | | // Some servers in the local VM ? |
| | | newBestServers = filterServersInSameVM(bestServers); |
| | | if (newBestServers.size() > 0) |
| | | { |
| | | bestServers = newBestServers; |
| | | } |
| | | break; |
| | | } |
| | | // If some servers with the right generation id this is useful to |
| | | // run the local DS change criteria |
| | | bestServers = |
| | | keepBest(filterServersWithAllLocalDSChanges(sameGenerationId, |
| | | myState, localServerId), sameGenerationId); |
| | | } |
| | | // Some servers in the local VM ? |
| | | bestServers = keepBest(filterServersInSameVM(bestServers), bestServers); |
| | | |
| | | /** |
| | | * Now apply the choice base on the weight to the best servers list |
| | |
| | | } |
| | | |
| | | /** |
| | | * If the filtered Map is not empty then it is returned, else return the |
| | | * original unfiltered Map. |
| | | * |
| | | * @return the best fit Map between the filtered Map and the original |
| | | * unfiltered Map. |
| | | */ |
| | | private static <K, V> Map<K, V> keepBest(Map<K, V> filteredMap, |
| | | Map<K, V> unfilteredMap) |
| | | { |
| | | if (!filteredMap.isEmpty()) |
| | | { |
| | | return filteredMap; |
| | | } |
| | | return unfilteredMap; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new list that contains only replication servers that are locally |
| | | * configured. |
| | | * @param bestServers The list of replication servers to filter |
| | |
| | | // key:server id, value: distance |
| | | Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>(); |
| | | // Precision for the operations (number of digits after the dot) |
| | | // Default value of rounding method is HALF_UP for |
| | | // the MathContext |
| | | MathContext mathContext = new MathContext(32); |
| | | MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | |
| | | int rsWeight = replicationServerInfo.getWeight(); |
| | | // load goal = rs weight / sum of weights |
| | | BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide( |
| | | new BigDecimal(sumOfWeights), mathContext); |
| | | BigDecimal loadGoalBd = BigDecimal.valueOf(rsWeight).divide( |
| | | BigDecimal.valueOf(sumOfWeights), mathContext); |
| | | BigDecimal currentLoadBd = BigDecimal.ZERO; |
| | | if (sumOfConnectedDSs != 0) |
| | | { |
| | | // current load = number of connected DSs / total number of DSs |
| | | int connectedDSs = replicationServerInfo.getConnectedDSNumber(); |
| | | currentLoadBd = (new BigDecimal(connectedDSs)).divide( |
| | | new BigDecimal(sumOfConnectedDSs), mathContext); |
| | | currentLoadBd = BigDecimal.valueOf(connectedDSs).divide( |
| | | BigDecimal.valueOf(sumOfConnectedDSs), mathContext); |
| | | } |
| | | // load distance = load goal - current load |
| | | BigDecimal loadDistanceBd = |
| | |
| | | bestRsId = rsId; |
| | | highestDistance = loadDistance; |
| | | } |
| | | if (loadDistance != (float)0) |
| | | if (loadDistance != 0) |
| | | { |
| | | allRsWithZeroDistance = false; |
| | | } |
| | |
| | | // All servers with a 0 distance ? |
| | | if (allRsWithZeroDistance) |
| | | { |
| | | // Choose server withe the highest weight |
| | | // Choose server with the highest weight |
| | | bestRsId = highestWeightRsId; |
| | | } |
| | | return bestServers.get(bestRsId); |
| | |
| | | |
| | | float currentLoadDistance = |
| | | loadDistances.get(currentRsServerId).floatValue(); |
| | | if (currentLoadDistance < (float) 0) |
| | | if (currentLoadDistance < 0) |
| | | { |
| | | // Too much DSs connected to the current RS, compared with its load |
| | | // goal: |
| | |
| | | } |
| | | } |
| | | |
| | | if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0) |
| | | if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0) |
| | | { |
| | | // The average distance of the other RSs shows a lack of DSs. |
| | | // Compute the number of DSs to disconnect from the current RS, |
| | |
| | | // current situation, otherwise the DS would keep move between the 2 |
| | | // RSs |
| | | float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd. |
| | | multiply(new BigDecimal(sumOfConnectedDSs), mathContext). |
| | | floatValue(); |
| | | multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext) |
| | | .floatValue(); |
| | | int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber); |
| | | |
| | | // Avoid yoyo effect |
| | |
| | | bestServers.get(currentRsServerId); |
| | | |
| | | int currentRsWeight = currentReplicationServerInfo.getWeight(); |
| | | BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight); |
| | | BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights); |
| | | BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight); |
| | | BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights); |
| | | BigDecimal currentRsLoadGoalBd = |
| | | currentRsWeightBd.divide(sumOfWeightsBd, mathContext); |
| | | BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0); |
| | | BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO; |
| | | if (sumOfConnectedDSs != 0) |
| | | { |
| | | int connectedDSs = currentReplicationServerInfo. |
| | | getConnectedDSNumber(); |
| | | BigDecimal potentialNewConnectedDSsBd = |
| | | new BigDecimal(connectedDSs - 1); |
| | | BigDecimal.valueOf(connectedDSs - 1); |
| | | BigDecimal sumOfConnectedDSsBd = |
| | | new BigDecimal(sumOfConnectedDSs); |
| | | BigDecimal.valueOf(sumOfConnectedDSs); |
| | | potentialCurrentRsNewLoadBd = |
| | | potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd, |
| | | mathContext); |
| | | mathContext); |
| | | } |
| | | BigDecimal potentialCurrentRsNewLoadDistanceBd = |
| | | currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd, |
| | | mathContext); |
| | | mathContext); |
| | | |
| | | // What would be the new load distance for the other RSs ? |
| | | BigDecimal additionalDsLoadBd = |
| | | (new BigDecimal(1)).divide( |
| | | new BigDecimal(sumOfConnectedDSs), mathContext); |
| | | BigDecimal.ONE.divide( |
| | | BigDecimal.valueOf(sumOfConnectedDSs),mathContext); |
| | | BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd = |
| | | sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd, |
| | | mathContext); |
| | | mathContext); |
| | | |
| | | // Now compare both values: we must no disconnect the DS if this |
| | | // is for going in a situation where the load distance of the other |
| | |
| | | if (failingSession == session) |
| | | { |
| | | connected = false; |
| | | rsGroupId = (byte) -1; |
| | | rsGroupId = -1; |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | | session = null; |
| | |
| | | // connectPhaseLock because it can be blocking and we don't |
| | | // want to hold off reconnection in case the connection dropped. |
| | | credit = |
| | | currentWindowSemaphore.tryAcquire( |
| | | (long) 500, TimeUnit.MILLISECONDS); |
| | | currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS); |
| | | } else |
| | | { |
| | | credit = true; |
| | |
| | | { |
| | | // This is the response to a MonitorRequest that was sent earlier or |
| | | // the regular message of the monitoring publisher of the RS. |
| | | MonitorMsg monitorMsg = (MonitorMsg) msg; |
| | | |
| | | // Extract and store replicas ServerStates |
| | | replicaStates = new HashMap<Integer, ServerState>(); |
| | | MonitorMsg monitorMsg = (MonitorMsg) msg; |
| | | Iterator<Integer> it = monitorMsg.ldapIterator(); |
| | | while (it.hasNext()) |
| | | for (int srvId : toIterable(monitorMsg.ldapIterator())) |
| | | { |
| | | int srvId = it.next(); |
| | | replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId)); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | // Update the replication servers ServerStates with new received info |
| | | it = monitorMsg.rsIterator(); |
| | | while (it.hasNext()) |
| | | for (int srvId : toIterable(monitorMsg.rsIterator())) |
| | | { |
| | | int srvId = it.next(); |
| | | ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId); |
| | | if (rsInfo != null) |
| | | { |
| | |
| | | stopRSHeartBeatMonitoring(); |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | replicationServer = "stopped"; |
| | | rsGroupId = (byte) -1; |
| | | rsGroupId = -1; |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | | if (session != null) |
| | |
| | | |
| | | // A new session is necessary only when information regarding |
| | | // the connection is modified |
| | | if ((servers == null) || |
| | | (!(replicationServers.size() == servers.size() && replicationServers. |
| | | containsAll(servers))) || |
| | | window != this.maxRcvWindow || |
| | | heartbeatInterval != this.heartbeatInterval || |
| | | (groupId != this.groupId)) |
| | | if (this.replicationServerUrls == null |
| | | || replicationServers.size() != this.replicationServerUrls.size() |
| | | || !replicationServers.containsAll(this.replicationServerUrls) |
| | | || window != this.maxRcvWindow |
| | | || heartbeatInterval != this.heartbeatInterval |
| | | || groupId != this.groupId) |
| | | { |
| | | needToRestartSession = true; |
| | | } |
| | | |
| | | this.servers = replicationServers; |
| | | this.replicationServerUrls = replicationServers; |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window / 2; |