| | |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | | import java.net.UnknownHostException; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.IOException; |
| | | import java.math.BigDecimal; |
| | | import java.math.MathContext; |
| | | import java.math.RoundingMode; |
| | | import java.net.ConnectException; |
| | | import java.net.InetAddress; |
| | | import java.net.InetSocketAddress; |
| | |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.DSInfo; |
| | |
| | | private Semaphore sendWindow; |
| | | private int maxSendWindow; |
| | | private int rcvWindow = 100; |
| | | private int halfRcvWindow = rcvWindow/2; |
| | | private int halfRcvWindow = rcvWindow / 2; |
| | | private int maxRcvWindow = rcvWindow; |
| | | private int timeout = 0; |
| | | private short protocolVersion; |
| | |
| | | private String rsServerUrl = null; |
| | | // Our replication domain |
| | | private ReplicationDomain domain = null; |
| | | |
| | | /** |
| | | * This object is used as a conditional event to be notified about |
| | | * the reception of monitor information from the Replication Server. |
| | | */ |
| | | private final MutableBoolean monitorResponse = new MutableBoolean(false); |
| | | |
| | | /** |
| | | * A Map containing the ServerStates of all the replicas in the topology |
| | | * as seen by the ReplicationServer the last time it was polled or the last |
| | |
| | | */ |
| | | private HashMap<Integer, ServerState> replicaStates = |
| | | new HashMap<Integer, ServerState>(); |
| | | |
| | | /** |
| | | * A Map containing the ServerStates of all the replication servers in the |
| | | * topology as seen by the ReplicationServer the last time it was polled or |
| | | * the last time it published monitoring information. |
| | | */ |
| | | private HashMap<Integer, ServerState> rsStates = |
| | | new HashMap<Integer, ServerState>(); |
| | | |
| | | /** |
| | | * The expected duration in milliseconds between heartbeats received |
| | | * from the replication server. Zero means heartbeats are off. |
| | |
| | | */ |
| | | private boolean connectionError = false; |
| | | private final Object connectPhaseLock = new Object(); |
| | | |
| | | // Same group id poller thread |
| | | private SameGroupIdPoller sameGroupIdPoller = null; |
| | | |
| | | /** |
| | | * The thread that publishes messages to the RS containing the current |
| | | * change time of this DS. |
| | |
| | | private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null; |
| | | /** |
| | | * The expected period in milliseconds between these messages are sent |
| | | * to the replication server. Zero means heartbeats are off. |
| | | * to the replication server. Zero means heartbeats are off. |
| | | */ |
| | | private long changeTimeHeartbeatSendInterval = 0; |
| | | /* |
| | |
| | | // Info for other DSs. |
| | | // Warning: does not contain info for us (for our server id) |
| | | private List<DSInfo> dsList = new ArrayList<DSInfo>(); |
| | | // Info for other RSs. |
| | | private List<RSInfo> rsList = new ArrayList<RSInfo>(); |
| | | |
| | | private long generationID; |
| | | private int updateDoneCount = 0; |
| | | private boolean connectRequiresRecovery = false; |
| | | /** |
| | | * The map of replication server info initialized at connection time and |
| | | * regularly updated. This is used to decide to which best suitable |
| | | * replication server one wants to connect. |
| | | * Key: replication server id |
| | | * Value: replication server info for the matching replication server id |
| | | */ |
| | | private Map<Integer, ReplicationServerInfo> replicationServerInfos = null; |
| | | /** |
| | | * This integer defines when the best replication server checking algorithm |
| | | * should be engaged. |
| | | * Every time a monitoring message (each monitoring publisher period) is |
| | | * received, it is incremented. When it reaches 2, we run the checking |
| | | * algorithm to see if we must reconnect to another best replication server. |
| | | * Then we reset the value to 0. But when a topology message is received, the |
| | | * integer is reseted to 0. This insures that we wait at least one monitoring |
| | | * publisher period before running the algorithm, but also that we wait at |
| | | * least for a monitoring period after the last received topology message |
| | | * (topology stabilization). |
| | | */ |
| | | private int mustRunBestServerCheckingAlgorithm = 0; |
| | | |
| | | /** |
| | | * Creates a new ReplicationServer Broker for a particular ReplicationDomain. |
| | |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.maxRcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window /2; |
| | | this.halfRcvWindow = window / 2; |
| | | this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval; |
| | | } |
| | | |
| | |
| | | return rsServerId; |
| | | } |
| | | |
| | | /** |
| | | /** |
| | | * Gets the server id. |
| | | * @return The server id |
| | | */ |
| | |
| | | private long getGenerationID() |
| | | { |
| | | if (domain != null) |
| | | return domain.getGenerationID(); |
| | | else |
| | | return generationID; |
| | | { |
| | | // Update the generation id |
| | | generationID = domain.getGenerationID(); |
| | | } |
| | | return generationID; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Bag class for keeping info we get from a server in order to compute the |
| | | * best one to connect to. This is in fact a wrapper to a |
| | | * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). |
| | | * Sets the locally configured flag for the passed ReplicationServerInfo |
| | | * object, analyzing the local configuration. |
| | | * @param |
| | | */ |
| | | public static class ServerInfo |
| | | private void updateRSInfoLocallyConfiguredStatus( |
| | | ReplicationServerInfo replicationServerInfo) |
| | | { |
| | | // Determine if the passed ReplicationServerInfo has a URL that is present |
| | | // in the locally configured replication servers |
| | | String rsUrl = replicationServerInfo.getServerURL(); |
| | | if (rsUrl == null) |
| | | { |
| | | // The ReplicationServerInfo has been generated from a server with |
| | | // no URL in TopologyMsg (i.e: with replication protocol version < 4): |
| | | // ignore this server as we do not know how to connect to it |
| | | replicationServerInfo.setLocallyConfigured(false); |
| | | return; |
| | | } |
| | | for (String serverUrl : servers) |
| | | { |
| | | if (isSameReplicationServerUrl(serverUrl, rsUrl)) |
| | | { |
| | | // This RS is locally configured, mark this |
| | | replicationServerInfo.setLocallyConfigured(true); |
| | | return; |
| | | } |
| | | } |
| | | replicationServerInfo.setLocallyConfigured(false); |
| | | } |
| | | |
| | | /** |
| | | * Compares 2 replication servers addresses and returns true if they both |
| | | * represent the same replication server instance. |
| | | * @param rs1Url Replication server 1 address |
| | | * @param rs2Url Replication server 2 address |
| | | * @return True if both replication server addresses represent the same |
| | | * replication server instance, false otherwise. |
| | | */ |
| | | private static boolean isSameReplicationServerUrl(String rs1Url, |
| | | String rs2Url) |
| | | { |
| | | // Get and compare ports of RS1 and RS2 |
| | | int separator1 = rs1Url.lastIndexOf(':'); |
| | | if (separator1 < 0) |
| | | { |
| | | // Not a RS url: should not happen |
| | | return false; |
| | | } |
| | | int rs1Port = Integer.parseInt(rs1Url.substring(separator1 + 1)); |
| | | |
| | | int separator2 = rs2Url.lastIndexOf(':'); |
| | | if (separator2 < 0) |
| | | { |
| | | // Not a RS url: should not happen |
| | | return false; |
| | | } |
| | | int rs2Port = Integer.parseInt(rs2Url.substring(separator2 + 1)); |
| | | |
| | | if (rs1Port != rs2Port) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | // Get and compare addresses of RS1 and RS2 |
| | | String rs1 = rs1Url.substring(0, separator1); |
| | | InetAddress[] rs1Addresses = null; |
| | | try |
| | | { |
| | | if (rs1.equals("localhost") || rs1.equals("127.0.0.1")) |
| | | { |
| | | // Replace localhost with the local official hostname |
| | | rs1 = InetAddress.getLocalHost().getHostName(); |
| | | } |
| | | rs1Addresses = InetAddress.getAllByName(rs1); |
| | | } catch (UnknownHostException ex) |
| | | { |
| | | // Unknown RS: should not happen |
| | | return false; |
| | | } |
| | | |
| | | String rs2 = rs2Url.substring(0, separator2); |
| | | InetAddress[] rs2Addresses = null; |
| | | try |
| | | { |
| | | if (rs2.equals("localhost") || rs2.equals("127.0.0.1")) |
| | | { |
| | | // Replace localhost with the local official hostname |
| | | rs2 = InetAddress.getLocalHost().getHostName(); |
| | | } |
| | | rs2Addresses = InetAddress.getAllByName(rs2); |
| | | } catch (UnknownHostException ex) |
| | | { |
| | | // Unknown RS: should not happen |
| | | return false; |
| | | } |
| | | |
| | | // Now compare addresses, if at least one match, this is the same server |
| | | for (int i = 0; i < rs1Addresses.length; i++) |
| | | { |
| | | InetAddress inetAddress1 = rs1Addresses[i]; |
| | | for (int j = 0; j < rs2Addresses.length; j++) |
| | | { |
| | | InetAddress inetAddress2 = rs2Addresses[j]; |
| | | if (inetAddress2.equals(inetAddress1)) |
| | | { |
| | | return true; |
| | | } |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Bag class for keeping info we get from a replication server in order to |
| | | * compute the best one to connect to. This is in fact a wrapper to a |
| | | * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be |
| | | * updated with a info coming from received topology messages or monitoring |
| | | * messages. |
| | | */ |
| | | public static class ReplicationServerInfo |
| | | { |
| | | private short protocolVersion; |
| | | private long generationId; |
| | | private byte groupId = (byte) -1; |
| | | private int serverId; |
| | | // Received server URL |
| | | private String serverURL; |
| | | private String baseDn = null; |
| | | private int windowSize; |
| | | private ServerState serverState; |
| | | private ServerState serverState = null; |
| | | private boolean sslEncryption; |
| | | private int degradedStatusThreshold = -1; |
| | | // Keeps the -1 value if created with a ReplServerStartMsg |
| | | private int weight = -1; |
| | | // Keeps the -1 value if created with a ReplServerStartMsg |
| | | private int connectedDSNumber = -1; |
| | | // Keeps the 1 value if created with a ReplServerStartMsg |
| | | private int weight = 1; |
| | | // Keeps the 0 value if created with a ReplServerStartMsg |
| | | private int connectedDSNumber = 0; |
| | | private List<Integer> connectedDSs = null; |
| | | // Is this RS locally configured ? (the RS is recognized as a usable server) |
| | | private boolean locallyConfigured = true; |
| | | |
| | | /** |
| | | * Create a new instance of ServerInfo wrapping the passed message. |
| | | * Create a new instance of ReplicationServerInfo wrapping the passed |
| | | * message. |
| | | * @param msg Message to wrap. |
| | | * @return The new instance wrapping the passed message. |
| | | * @throws IllegalArgumentException If the passed message has an unexpected |
| | | * type. |
| | | */ |
| | | public static ServerInfo newServerInfo( |
| | | public static ReplicationServerInfo newInstance( |
| | | ReplicationMsg msg) throws IllegalArgumentException |
| | | { |
| | | if (msg instanceof ReplServerStartMsg) |
| | | { |
| | | // This is a ReplServerStartMsg (RS uses protocol V3 or under) |
| | | ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg; |
| | | return new ServerInfo(replServerStartMsg); |
| | | } |
| | | else if (msg instanceof ReplServerStartDSMsg) |
| | | ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg; |
| | | return new ReplicationServerInfo(replServerStartMsg); |
| | | } else if (msg instanceof ReplServerStartDSMsg) |
| | | { |
| | | // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher) |
| | | ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg; |
| | | return new ServerInfo(replServerStartDSMsg); |
| | | ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg; |
| | | return new ReplicationServerInfo(replServerStartDSMsg); |
| | | } |
| | | |
| | | // Unsupported message type: should not happen |
| | |
| | | } |
| | | |
| | | /** |
| | | * Constructs a ServerInfo object wrapping a ReplServerStartMsg. |
| | | * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg. |
| | | * @param replServerStartMsg The ReplServerStartMsg this object will wrap. |
| | | */ |
| | | private ServerInfo(ReplServerStartMsg replServerStartMsg) |
| | | private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg) |
| | | { |
| | | this.protocolVersion = replServerStartMsg.getVersion(); |
| | | this.generationId = replServerStartMsg.getGenerationId(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg. |
| | | * Constructs a ReplicationServerInfo object wrapping a |
| | | * ReplServerStartDSMsg. |
| | | * @param replServerStartDSMsg The ReplServerStartDSMsg this object will |
| | | * wrap. |
| | | */ |
| | | private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg) |
| | | private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg) |
| | | { |
| | | this.protocolVersion = replServerStartDSMsg.getVersion(); |
| | | this.generationId = replServerStartDSMsg.getGenerationId(); |
| | |
| | | { |
| | | return connectedDSNumber; |
| | | } |
| | | |
| | | /** |
| | | * Constructs a new replication server info with the passed RSInfo |
| | | * internal values and the passed connected DSs. |
| | | * @param rsInfo The RSinfo to use for the update |
| | | * @param connectedDSs The new connected DSs |
| | | */ |
| | | public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs) |
| | | { |
| | | this.serverId = rsInfo.getId(); |
| | | this.serverURL = rsInfo.getServerUrl(); |
| | | this.generationId = rsInfo.getGenerationId(); |
| | | this.groupId = rsInfo.getGroupId(); |
| | | this.weight = rsInfo.getWeight(); |
| | | this.connectedDSs = connectedDSs; |
| | | this.connectedDSNumber = connectedDSs.size(); |
| | | } |
| | | |
| | | /** |
| | | * Converts the object to a RSInfo object. |
| | | * @return The RSInfo object matching this object. |
| | | */ |
| | | public RSInfo toRSInfo() |
| | | { |
| | | return new RSInfo(serverId, serverURL, generationId, groupId, weight); |
| | | } |
| | | |
| | | /** |
| | | * Updates replication server info with the passed RSInfo internal values |
| | | * and the passed connected DSs. |
| | | * @param rsInfo The RSinfo to use for the update |
| | | * @param connectedDSs The new connected DSs |
| | | */ |
| | | public void update(RSInfo rsInfo, List<Integer> connectedDSs) |
| | | { |
| | | this.generationId = rsInfo.getGenerationId(); |
| | | this.groupId = rsInfo.getGroupId(); |
| | | this.weight = rsInfo.getWeight(); |
| | | this.connectedDSs = connectedDSs; |
| | | this.connectedDSNumber = connectedDSs.size(); |
| | | } |
| | | |
| | | /** |
| | | * Updates replication server info with the passed server state. |
| | | * @param serverState The ServerState to use for the update |
| | | */ |
| | | public void update(ServerState serverState) |
| | | { |
| | | if (this.serverState != null) |
| | | { |
| | | this.serverState.update(serverState); |
| | | } else |
| | | { |
| | | this.serverState = serverState; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the getConnectedDSs. |
| | | * @return the getConnectedDSs |
| | | */ |
| | | public List<Integer> getConnectedDSs() |
| | | { |
| | | return connectedDSs; |
| | | } |
| | | |
| | | /** |
| | | * Gets the locally configured status for this RS. |
| | | * @return the locallyConfigured |
| | | */ |
| | | public boolean isLocallyConfigured() |
| | | { |
| | | return locallyConfigured; |
| | | } |
| | | |
| | | /** |
| | | * Sets the locally configured status for this RS. |
| | | * @param locallyConfigured the locallyConfigured to set |
| | | */ |
| | | public void setLocallyConfigured(boolean locallyConfigured) |
| | | { |
| | | this.locallyConfigured = locallyConfigured; |
| | | } |
| | | } |
| | | |
| | | private void connect() |
| | | { |
| | | if (this.baseDn.compareToIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0) |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0) |
| | | { |
| | | connectAsECL(); |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | connectAsDataServer(); |
| | | } |
| | |
| | | * able to choose the more suitable. |
| | | * @return the collected information. |
| | | */ |
| | | private Map<String, ServerInfo> collectReplicationServersInfo() { |
| | | private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo() |
| | | { |
| | | |
| | | Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>(); |
| | | Map<Integer, ReplicationServerInfo> rsInfos = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (String server : servers) |
| | | { |
| | | // Connect to server and get info about it |
| | | ServerInfo serverInfo = performPhaseOneHandshake(server, false); |
| | | ReplicationServerInfo replicationServerInfo = |
| | | performPhaseOneHandshake(server, false); |
| | | |
| | | // Store server info in list |
| | | if (serverInfo != null) |
| | | if (replicationServerInfo != null) |
| | | { |
| | | rsInfos.put(server, serverInfo); |
| | | rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo); |
| | | } |
| | | } |
| | | |
| | |
| | | * are : |
| | | * - 1 single RS configured |
| | | * - so no choice of the preferred RS |
| | | * - No same groupID polling |
| | | * - ?? Heartbeat |
| | | * - Start handshake is : |
| | | * Broker ---> StartECLMsg ---> RS |
| | |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | String bestServer = this.servers.iterator().next(); |
| | | |
| | | ReplServerStartDSMsg inReplServerStartDSMsg |
| | | = performECLPhaseOneHandshake(bestServer, true); |
| | | ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake( |
| | | bestServer, true); |
| | | |
| | | if (inReplServerStartDSMsg!=null) |
| | | if (inReplServerStartDSMsg != null) |
| | | performECLPhaseTwoHandshake(bestServer); |
| | | } |
| | | |
| | |
| | | * |
| | | * phase 1: |
| | | * DS --- ServerStartMsg ---> RS |
| | | * DS <--- ReplServerStartMsg --- RS |
| | | * DS <--- ReplServerStartDSMsg --- RS |
| | | * phase 2: |
| | | * DS --- StartSessionMsg ---> RS |
| | | * DS <--- TopologyMsg --- RS |
| | |
| | | } |
| | | |
| | | // Stop any existing poller and heartbeat monitor from a previous session. |
| | | stopSameGroupIdPoller(); |
| | | stopRSHeartBeatMonitoring(); |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | mustRunBestServerCheckingAlgorithm = 0; |
| | | |
| | | boolean newServerWithSameGroupId = false; |
| | | synchronized (connectPhaseLock) |
| | |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " + |
| | | " order to elect the preferred one"); |
| | | " order to elect the preferred one"); |
| | | |
| | | // Get info from every available replication servers |
| | | Map<String, ServerInfo> rsInfos = collectReplicationServersInfo(); |
| | | replicationServerInfos = collectReplicationServersInfo(); |
| | | |
| | | ServerInfo serverInfo = null; |
| | | ReplicationServerInfo replicationServerInfo = null; |
| | | |
| | | if (rsInfos.size() > 0) |
| | | if (replicationServerInfos.size() > 0) |
| | | { |
| | | // At least one server answered, find the best one. |
| | | String bestServer = computeBestReplicationServer(state, rsInfos, |
| | | serverId, baseDn, groupId); |
| | | replicationServerInfo = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, baseDn, groupId, |
| | | this.getGenerationID()); |
| | | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "phase 2 : will perform PhaseOneH with the preferred RS."); |
| | | serverInfo = performPhaseOneHandshake(bestServer, true); |
| | | "phase 2 : will perform PhaseOneH with the preferred RS."); |
| | | replicationServerInfo = performPhaseOneHandshake( |
| | | replicationServerInfo.getServerURL(), true); |
| | | // Update replication server info with potentially more up to date data |
| | | // (server state for instance may have changed) |
| | | replicationServerInfos.put(replicationServerInfo.getServerId(), |
| | | replicationServerInfo); |
| | | |
| | | if (serverInfo != null) // Handshake phase 1 exchange went well |
| | | |
| | | if (replicationServerInfo != null) |
| | | { |
| | | // Handshake phase 1 exchange went well |
| | | |
| | | // Compute in which status we are starting the session to tell the RS |
| | | ServerStatus initStatus = |
| | | computeInitialServerStatus(serverInfo.getGenerationId(), |
| | | serverInfo.getServerState(), |
| | | serverInfo.getDegradedStatusThreshold(), |
| | | computeInitialServerStatus(replicationServerInfo.getGenerationId(), |
| | | replicationServerInfo.getServerState(), |
| | | replicationServerInfo.getDegradedStatusThreshold(), |
| | | this.getGenerationID()); |
| | | |
| | | // Perfom session start (handshake phase 2) |
| | | TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer, |
| | | initStatus); |
| | | // Perform session start (handshake phase 2) |
| | | TopologyMsg topologyMsg = performPhaseTwoHandshake( |
| | | replicationServerInfo.getServerURL(), initStatus); |
| | | |
| | | if (topologyMsg != null) // Handshake phase 2 exchange went well |
| | | |
| | | { |
| | | try |
| | | { |
| | |
| | | * reconnection at that time to retrieve a server with our group |
| | | * id. |
| | | */ |
| | | byte tmpRsGroupId = serverInfo.getGroupId(); |
| | | byte tmpRsGroupId = replicationServerInfo.getGroupId(); |
| | | boolean someServersWithSameGroupId = |
| | | hasSomeServerWithSameGroupId(topologyMsg.getRsList()); |
| | | |
| | |
| | | ((tmpRsGroupId != groupId) && !someServersWithSameGroupId)) |
| | | { |
| | | replicationServer = session.getReadableRemoteAddress(); |
| | | maxSendWindow = serverInfo.getWindowSize(); |
| | | rsGroupId = serverInfo.getGroupId(); |
| | | rsServerId = serverInfo.getServerId(); |
| | | rsServerUrl = bestServer; |
| | | maxSendWindow = replicationServerInfo.getWindowSize(); |
| | | rsGroupId = replicationServerInfo.getGroupId(); |
| | | rsServerId = replicationServerInfo.getServerId(); |
| | | rsServerUrl = replicationServerInfo.getServerURL(); |
| | | |
| | | receiveTopo(topologyMsg); |
| | | |
| | |
| | | if (domain != null) |
| | | { |
| | | domain.sessionInitiated( |
| | | initStatus, serverInfo.getServerState(), |
| | | serverInfo.getGenerationId(), |
| | | session); |
| | | initStatus, replicationServerInfo.getServerState(), |
| | | replicationServerInfo.getGenerationId(), |
| | | session); |
| | | } |
| | | |
| | | if (getRsGroupId() != groupId) |
| | | { |
| | | // Connected to replication server with wrong group id: |
| | | // warn user and start poller to recover when a server with |
| | | // right group id arrives... |
| | | Message message = |
| | | WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get( |
| | | Byte.toString(groupId), Integer.toString(rsServerId), |
| | | bestServer, Byte.toString(getRsGroupId()), |
| | | baseDn.toString(), Integer.toString(serverId)); |
| | | logError(message); |
| | | startSameGroupIdPoller(); |
| | | // Connected to replication server with wrong group id: |
| | | // warn user and start poller to recover when a server with |
| | | // right group id arrives... |
| | | Message message = |
| | | WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get( |
| | | Byte.toString(groupId), Integer.toString(rsServerId), |
| | | replicationServerInfo.getServerURL(), |
| | | Byte.toString(getRsGroupId()), |
| | | baseDn.toString(), Integer.toString(serverId)); |
| | | logError(message); |
| | | } |
| | | startRSHeartBeatMonitoring(); |
| | | if (serverInfo.getProtocolVersion() |
| | | >= ProtocolVersion.REPLICATION_PROTOCOL_V3) |
| | | if (replicationServerInfo.getProtocolVersion() >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V3) |
| | | { |
| | | startChangeTimeHeartBeatPublishing(); |
| | | } |
| | |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_COMPUTING_FAKE_OPS.get( |
| | | baseDn, bestServer, |
| | | baseDn, replicationServerInfo.getServerURL(), |
| | | e.getLocalizedMessage() + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } finally |
| | |
| | | { |
| | | connectPhaseLock.notify(); |
| | | |
| | | if ((serverInfo.getGenerationId() == this.getGenerationID()) || |
| | | (serverInfo.getGenerationId() == -1)) |
| | | if ((replicationServerInfo.getGenerationId() == |
| | | this.getGenerationID()) || |
| | | (replicationServerInfo.getGenerationId() == -1)) |
| | | { |
| | | Message message = |
| | | NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get( |
| | |
| | | baseDn.toString(), |
| | | replicationServer, |
| | | Long.toString(this.getGenerationID()), |
| | | Long.toString(serverInfo.getGenerationId())); |
| | | Long.toString(replicationServerInfo.getGenerationId())); |
| | | logError(message); |
| | | } |
| | | } else |
| | |
| | | /** |
| | | * Connect to the provided server performing the first phase handshake |
| | | * (start messages exchange) and return the reply message from the replication |
| | | * server, wrapped in a ServerInfo object. |
| | | * server, wrapped in a ReplicationServerInfo object. |
| | | * |
| | | * @param server Server to connect to. |
| | | * @param keepConnection Do we keep session opened or not after handshake. |
| | |
| | | * @return The answer from the server . Null if could not |
| | | * get an answer. |
| | | */ |
| | | private ServerInfo performPhaseOneHandshake(String server, |
| | | private ReplicationServerInfo performPhaseOneHandshake(String server, |
| | | boolean keepConnection) |
| | | { |
| | | ServerInfo serverInfo = null; |
| | | ReplicationServerInfo replServerInfo = null; |
| | | |
| | | // Parse server string. |
| | | int separator = server.lastIndexOf(':'); |
| | |
| | | ReplicationMsg msg = localSession.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() + |
| | | "\nAND RECEIVED:\n" + msg.toString()); |
| | | } |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() + |
| | | "\nAND RECEIVED:\n" + msg.toString()); |
| | | } |
| | | |
| | | // Wrap received message in a server info object |
| | | serverInfo = ServerInfo.newServerInfo(msg); |
| | | replServerInfo = ReplicationServerInfo.newInstance(msg); |
| | | |
| | | // Sanity check |
| | | String repDn = serverInfo.getBaseDn(); |
| | | String repDn = replServerInfo.getBaseDn(); |
| | | if (!(this.baseDn.equals(repDn))) |
| | | { |
| | | Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), |
| | |
| | | * if it is an old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | serverInfo.getProtocolVersion()); |
| | | replServerInfo.getProtocolVersion()); |
| | | localSession.setProtocolVersion(protocolVersion); |
| | | |
| | | |
| | |
| | | error = true; |
| | | } catch (Exception e) |
| | | { |
| | | if ( (e instanceof SocketTimeoutException) && debugEnabled() ) |
| | | if ((e instanceof SocketTimeoutException) && debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Timeout trying to connect to RS " + server + |
| | | " for dn: " + baseDn); |
| | |
| | | } |
| | | if (error) |
| | | { |
| | | serverInfo = null; |
| | | replServerInfo = null; |
| | | } // Be sure to return null. |
| | | |
| | | } |
| | |
| | | session = localSession; |
| | | } |
| | | |
| | | return serverInfo; |
| | | return replServerInfo; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | // Send our start msg. |
| | | ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg( |
| | | baseDn, 0, 0, 0, 0, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), this.getGenerationID(), |
| | | isSslEncryption, |
| | | groupId); |
| | | baseDn, 0, 0, 0, 0, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), this.getGenerationID(), |
| | | isSslEncryption, |
| | | groupId); |
| | | localSession.publish(serverStartECLMsg); |
| | | |
| | | // Read the ReplServerStartMsg that should come back. |
| | |
| | | error = true; |
| | | } catch (Exception e) |
| | | { |
| | | if ( (e instanceof SocketTimeoutException) && debugEnabled() ) |
| | | if ((e instanceof SocketTimeoutException) && debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Timeout trying to connect to RS " + server + |
| | | " for dn: " + baseDn); |
| | |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString()); |
| | | // + "\nAND RECEIVED:\n" + topologyMsg.toString()); |
| | | // + "\nAND RECEIVED:\n" + topologyMsg.toString()); |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | |
| | | { |
| | | startSessionMsg = |
| | | new StartSessionMsg( |
| | | initStatus, |
| | | domain.getRefUrls(), |
| | | domain.isAssured(), |
| | | domain.getAssuredMode(), |
| | | domain.getAssuredSdLevel()); |
| | | initStatus, |
| | | domain.getRefUrls(), |
| | | domain.isAssured(), |
| | | domain.getAssuredMode(), |
| | | domain.getAssuredSdLevel()); |
| | | startSessionMsg.setEclIncludes( |
| | | domain.getEclInclude()); |
| | | domain.getEclInclude()); |
| | | } else |
| | | { |
| | | startSessionMsg = |
| | |
| | | |
| | | /** |
| | | * Returns the replication server that best fits our need so that we can |
| | | * connect to it. |
| | | * This methods performs some filtering on the group id, then call |
| | | * the real search for best server algorithm (searchForBestReplicationServer). |
| | | * connect to it or determine if we must disconnect from current one to |
| | | * re-connect to best server. |
| | | * |
| | | * Note: this method put as public static for unit testing purpose. |
| | | * Note: this method is static for test purpose (access from unit tests) |
| | | * |
| | | * @param firstConnection True if we run this method for the very first |
| | | * connection of the broker. False if we run this method to determine if the |
| | | * replication server we are currently connected to is still the best or not. |
| | | * @param rsServerId The id of the replication server we are currently |
| | | * connected to. Only used when firstConnection is false. |
| | | * @param myState The local server state. |
| | | * @param rsInfos The list of available replication servers and their |
| | | * associated information (choice will be made among them). |
| | | * associated information (choice will be made among them). |
| | | * @param localServerId The server id for the suffix we are working for. |
| | | * @param baseDn The suffix for which we are working for. |
| | | * @param groupId The groupId we prefer being connected to if possible |
| | | * @return The computed best replication server. |
| | | * @param generationId The generation id we are using |
| | | * @return The computed best replication server. If the returned value is |
| | | * null, the best replication server is undetermined but the local server must |
| | | * disconnect (so the best replication server is another one than the current |
| | | * one). Null can only be returned when firstConnection is false. |
| | | */ |
| | | public static String computeBestReplicationServer(ServerState myState, |
| | | Map<String, ServerInfo> rsInfos, int localServerId, |
| | | String baseDn, byte groupId) |
| | | public static ReplicationServerInfo computeBestReplicationServer( |
| | | boolean firstConnection, int rsServerId, ServerState myState, |
| | | Map<Integer, ReplicationServerInfo> rsInfos, int localServerId, |
| | | String baseDn, byte groupId, long generationId) |
| | | { |
| | | /* |
| | | * Preference is given to servers with the requested group id: |
| | | * If there are some servers with the requested group id in the provided |
| | | * server list, then we run the search algorithm only on them. If no server |
| | | * with the requested group id, consider all of them. |
| | | */ |
| | | |
| | | // Filter for servers with same group id |
| | | Map<String, ServerInfo> sameGroupIdRsInfos = |
| | | new HashMap<String, ServerInfo>(); |
| | | |
| | | for (String repServer : rsInfos.keySet()) |
| | | { |
| | | ServerInfo serverInfo = rsInfos.get(repServer); |
| | | if (serverInfo.getGroupId() == groupId) |
| | | sameGroupIdRsInfos.put(repServer, serverInfo); |
| | | } |
| | | |
| | | // Some servers with same group id ? |
| | | if (sameGroupIdRsInfos.size() > 0) |
| | | { |
| | | return searchForBestReplicationServer(myState, sameGroupIdRsInfos, |
| | | localServerId, baseDn); |
| | | } else |
| | | { |
| | | return searchForBestReplicationServer(myState, rsInfos, |
| | | localServerId, baseDn); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the replication server that best fits our need so that we can |
| | | * connect to it. |
| | | * |
| | | * Note: this method put as public static for unit testing purpose. |
| | | * |
| | | * @param myState The local server state. |
| | | * @param rsInfos The list of available replication servers and their |
| | | * associated information (choice will be made among them). |
| | | * @param localServerID The server id for the suffix we are working for. |
| | | * @param baseDn The suffix for which we are working for. |
| | | * @return The computed best replication server. |
| | | */ |
| | | private static String searchForBestReplicationServer(ServerState myState, |
| | | Map<String, ServerInfo> rsInfos, int localServerID, String baseDn) |
| | | { |
| | | /* |
| | | * Find replication servers who are up to date (or more up to date than us, |
| | | * if for instance we failed and restarted, having sent some changes to the |
| | | * RS but without having time to store our own state) regarding our own |
| | | * server id. Then, among them, choose the server that is the most up to |
| | | * date regarding the whole topology. |
| | | * |
| | | * If no server is up to date regarding our own server id, find the one who |
| | | * is the most up to date regarding our server id. |
| | | */ |
| | | |
| | | // Should never happen (sanity check) |
| | | if ((myState == null) || (rsInfos == null) || (rsInfos.size() < 1) || |
| | | (baseDn == null)) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | // Shortcut, if only one server, this is the best |
| | | if (rsInfos.size() == 1) |
| | | { |
| | | for (String repServer : rsInfos.keySet()) |
| | | return repServer; |
| | | return rsInfos.values().iterator().next(); |
| | | } |
| | | |
| | | String bestServer = null; |
| | | boolean bestServerIsLocal = false; |
| | | |
| | | // Servers up to dates with regard to our changes |
| | | HashMap<String, ServerState> upToDateServers = |
| | | new HashMap<String, ServerState>(); |
| | | // Servers late with regard to our changes |
| | | HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>(); |
| | | |
| | | /* |
| | | * Start loop to differentiate up to date servers from late ones. |
| | | /** |
| | | * Apply some filtering criteria to determine the best servers list from |
| | | * the available ones. The ordered list of criteria is (from more important |
| | | * to less important): |
| | | * - replication server has the same group id as the local DS one |
| | | * - replication server has the same generation id as the local DS one |
| | | * - replication server is up to date regarding changes generated by the |
| | | * local DS |
| | | * - replication server in the same VM as local DS one |
| | | */ |
| | | ChangeNumber myChangeNumber = myState.getMaxChangeNumber(localServerID); |
| | | if (myChangeNumber == null) |
| | | Map<Integer, ReplicationServerInfo> bestServers = rsInfos; |
| | | Map<Integer, ReplicationServerInfo> newBestServers; |
| | | // The list of best replication servers is filtered with each criteria. At |
| | | // each criteria, the list is replaced with the filtered one if some there |
| | | // are some servers from the filtering, otherwise, the list is left as is |
| | | // and the new filtering for the next criteria is applied and so on. |
| | | for (int filterLevel = 1; filterLevel <= 4; filterLevel++) |
| | | { |
| | | myChangeNumber = new ChangeNumber(0, 0, localServerID); |
| | | } |
| | | for (String repServer : rsInfos.keySet()) |
| | | { |
| | | |
| | | ServerState rsState = rsInfos.get(repServer).getServerState(); |
| | | ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerID); |
| | | if (rsChangeNumber == null) |
| | | newBestServers = null; |
| | | switch (filterLevel) |
| | | { |
| | | rsChangeNumber = new ChangeNumber(0, 0, localServerID); |
| | | case 1: |
| | | // Use only servers locally configured: those are servers declared in |
| | | // the local configuration. When the current method is called, for |
| | | // sure, at least one server from the list is locally configured |
| | | bestServers = filterServersLocallyConfigured(bestServers); |
| | | break; |
| | | case 2: |
| | | // Some servers with same group id ? |
| | | newBestServers = filterServersWithSameGroupId(bestServers, groupId); |
| | | if (newBestServers.size() > 0) |
| | | { |
| | | bestServers = newBestServers; |
| | | } |
| | | break; |
| | | case 3: |
| | | // Some servers with same generation id ? |
| | | newBestServers = filterServersWithSameGenerationId(bestServers, |
| | | generationId); |
| | | if (newBestServers.size() > 0) |
| | | { |
| | | // Ok some servers with the right generation id |
| | | bestServers = newBestServers; |
| | | // If some servers with the right generation id this is useful to |
| | | // run the local DS change criteria |
| | | newBestServers = filterServersWithAllLocalDSChanges(bestServers, |
| | | myState, localServerId); |
| | | if (newBestServers.size() > 0) |
| | | { |
| | | bestServers = newBestServers; |
| | | } |
| | | } |
| | | break; |
| | | case 4: |
| | | // Some servers in the local VM ? |
| | | newBestServers = filterServersInSameVM(bestServers); |
| | | if (newBestServers.size() > 0) |
| | | { |
| | | bestServers = newBestServers; |
| | | } |
| | | break; |
| | | } |
| | | } |
| | | |
| | | // Store state in right list |
| | | if (myChangeNumber.olderOrEqual(rsChangeNumber)) |
| | | /** |
| | | * Now apply the choice base on the weight to the best servers list |
| | | */ |
| | | if (bestServers.size() > 1) |
| | | { |
| | | if (firstConnection) |
| | | { |
| | | upToDateServers.put(repServer, rsState); |
| | | // We are no connected to a server yet |
| | | return computeBestServerForWeight(bestServers, -1, -1); |
| | | } else |
| | | { |
| | | lateOnes.put(repServer, rsState); |
| | | // We are already connected to a RS: compute the best RS as far as the |
| | | // weights is concerned. If this is another one, some DS must |
| | | // disconnect. |
| | | return computeBestServerForWeight(bestServers, rsServerId, |
| | | localServerId); |
| | | } |
| | | } |
| | | |
| | | if (upToDateServers.size() > 0) |
| | | { |
| | | /* |
| | | * Some up to date servers, among them, choose either : |
| | | * - The local one |
| | | * - The one that has the maximum number of changes to send us. |
| | | * This is the most up to date one regarding the whole topology. |
| | | * This server is the one which has the less |
| | | * difference with the topology server state. |
| | | * For comparison, we need to compute the difference for each |
| | | * server id with the topology server state. |
| | | */ |
| | | |
| | | Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get( |
| | | upToDateServers.size(), baseDn, Integer.toString(localServerID)); |
| | | logError(message); |
| | | |
| | | /* |
| | | * First of all, compute the virtual server state for the whole topology, |
| | | * which is composed of the most up to date change numbers for |
| | | * each server id in the topology. |
| | | */ |
| | | ServerState topoState = new ServerState(); |
| | | for (ServerState curState : upToDateServers.values()) |
| | | { |
| | | |
| | | Iterator<Integer> it = curState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | Integer sId = it.next(); |
| | | ChangeNumber curSidCn = curState.getMaxChangeNumber(sId); |
| | | if (curSidCn == null) |
| | | { |
| | | curSidCn = new ChangeNumber(0, 0, sId); |
| | | } |
| | | // Update topology state |
| | | topoState.update(curSidCn); |
| | | } |
| | | } // For up to date servers |
| | | |
| | | // Min of the max shifts |
| | | long minShift = -1L; |
| | | for (String upServer : upToDateServers.keySet()) |
| | | { |
| | | |
| | | /* |
| | | * Compute the maximum difference between the time of a server id's |
| | | * change number and the time of the matching server id's change |
| | | * number in the topology server state. |
| | | * |
| | | * Note: we could have used the sequence number here instead of the |
| | | * timestamp, but this would have caused a problem when the sequence |
| | | * number loops and comes back to 0 (computation would have becomen |
| | | * meaningless). |
| | | */ |
| | | long shift = 0; |
| | | ServerState curState = upToDateServers.get(upServer); |
| | | Iterator<Integer> it = curState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | Integer sId = it.next(); |
| | | ChangeNumber curSidCn = curState.getMaxChangeNumber(sId); |
| | | if (curSidCn == null) |
| | | { |
| | | curSidCn = new ChangeNumber(0, 0, sId); |
| | | } |
| | | // Cannot be null as checked at construction time |
| | | ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId); |
| | | // Cannot be negative as topoState computed as being the max CN |
| | | // for each server id in the topology |
| | | long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime(); |
| | | shift +=tmpShift; |
| | | } |
| | | |
| | | boolean upServerIsLocal = |
| | | ReplicationServer.isLocalReplicationServer(upServer); |
| | | if ((minShift < 0) // First time in loop |
| | | || ((shift < minShift) && upServerIsLocal) |
| | | || (((bestServerIsLocal == false) && (shift < minShift))) |
| | | || ((bestServerIsLocal == false) && (upServerIsLocal && |
| | | (shift<(minShift + 60)) )) |
| | | || (shift+120 < minShift)) |
| | | { |
| | | // This server is even closer to topo state |
| | | bestServer = upServer; |
| | | bestServerIsLocal = upServerIsLocal; |
| | | minShift = shift; |
| | | } |
| | | } // For up to date servers |
| | | |
| | | } else |
| | | { |
| | | /* |
| | | * We could not find a replication server that has seen all the |
| | | * changes that this server has already processed, |
| | | */ |
| | | // lateOnes cannot be empty |
| | | Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( |
| | | baseDn, lateOnes.size()); |
| | | logError(message); |
| | | |
| | | // Min of the shifts |
| | | long minShift = -1L; |
| | | for (String lateServer : lateOnes.keySet()) |
| | | { |
| | | /* |
| | | * Choose the server who is the closest to us regarding our server id |
| | | * (this is the most up to date regarding our server id). |
| | | */ |
| | | ServerState curState = lateOnes.get(lateServer); |
| | | ChangeNumber ourSidCn = curState.getMaxChangeNumber(localServerID); |
| | | if (ourSidCn == null) |
| | | { |
| | | ourSidCn = new ChangeNumber(0, 0, localServerID); |
| | | } |
| | | // Cannot be negative as our Cn for our server id is strictly |
| | | // greater than those of the servers in late server list |
| | | long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime(); |
| | | |
| | | boolean lateServerisLocal = |
| | | ReplicationServer.isLocalReplicationServer(lateServer); |
| | | if ((minShift < 0) // First time in loop |
| | | || ((tmpShift < minShift) && lateServerisLocal) |
| | | || (((bestServerIsLocal == false) && (tmpShift < minShift))) |
| | | || ((bestServerIsLocal == false) && (lateServerisLocal && |
| | | (tmpShift<(minShift + 60)) )) |
| | | || (tmpShift+120 < minShift)) |
| | | { |
| | | // This server is even closer to topo state |
| | | bestServer = lateServer; |
| | | bestServerIsLocal = lateServerisLocal; |
| | | minShift = tmpShift; |
| | | } |
| | | } // For late servers |
| | | |
| | | return bestServers.values().iterator().next(); |
| | | } |
| | | return bestServer; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new list that contains only replication servers that are locally |
| | | * configured. |
| | | * @param bestServers The list of replication servers to filter |
| | | * @return The sub list of replication servers locally configured |
| | | */ |
| | | private static Map<Integer, ReplicationServerInfo> |
| | | filterServersLocallyConfigured(Map<Integer, |
| | | ReplicationServerInfo> bestServers) |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.isLocallyConfigured()) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new list that contains only replication servers that have the |
| | | * passed group id, from a passed replication server list. |
| | | * @param bestServers The list of replication servers to filter |
| | | * @param groupId The group id that must match |
| | | * @return The sub list of replication servers matching the requested group id |
| | | * (which may be empty) |
| | | */ |
| | | private static Map<Integer, ReplicationServerInfo> |
| | | filterServersWithSameGroupId(Map<Integer, |
| | | ReplicationServerInfo> bestServers, byte groupId) |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.getGroupId() == groupId) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new list that contains only replication servers that have the |
| | | * passed generation id, from a passed replication server list. |
| | | * @param bestServers The list of replication servers to filter |
| | | * @param generationId The generation id that must match |
| | | * @return The sub list of replication servers matching the requested |
| | | * generation id (which may be empty) |
| | | */ |
| | | private static Map<Integer, ReplicationServerInfo> |
| | | filterServersWithSameGenerationId(Map<Integer, |
| | | ReplicationServerInfo> bestServers, long generationId) |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.getGenerationId() == generationId) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new list that contains only replication servers that have the |
| | | * latest changes from the passed DS, from a passed replication server list. |
| | | * @param bestServers The list of replication servers to filter |
| | | * @param localState The state of the local DS |
| | | * @param localServerId The server id to consider for the changes |
| | | * @return The sub list of replication servers that have the latest changes |
| | | * from the passed DS (which may be empty) |
| | | */ |
| | | private static Map<Integer, ReplicationServerInfo> |
| | | filterServersWithAllLocalDSChanges(Map<Integer, |
| | | ReplicationServerInfo> bestServers, ServerState localState, |
| | | int localServerId) |
| | | { |
| | | Map<Integer, ReplicationServerInfo> upToDateServers = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | Map<Integer, ReplicationServerInfo> moreUpToDateServers = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | // Extract the change number of the latest change generated by the local |
| | | // server |
| | | ChangeNumber myChangeNumber = localState.getMaxChangeNumber(localServerId); |
| | | if (myChangeNumber == null) |
| | | { |
| | | myChangeNumber = new ChangeNumber(0, 0, localServerId); |
| | | } |
| | | |
| | | /** |
| | | * Find replication servers who are up to date (or more up to date than us, |
| | | * if for instance we failed and restarted, having sent some changes to the |
| | | * RS but without having time to store our own state) regarding our own |
| | | * server id. If some servers more up to date, prefer this list. |
| | | */ |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | ServerState rsState = replicationServerInfo.getServerState(); |
| | | ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerId); |
| | | if (rsChangeNumber == null) |
| | | { |
| | | rsChangeNumber = new ChangeNumber(0, 0, localServerId); |
| | | } |
| | | |
| | | // Has this replication server the latest local change ? |
| | | if (myChangeNumber.olderOrEqual(rsChangeNumber)) |
| | | { |
| | | if (myChangeNumber.equals(rsChangeNumber)) |
| | | { |
| | | // This replication server has exactly the latest change from the |
| | | // local server |
| | | upToDateServers.put(rsId, replicationServerInfo); |
| | | } else |
| | | { |
| | | // This replication server is even more up to date than the local |
| | | // server |
| | | moreUpToDateServers.put(rsId, replicationServerInfo); |
| | | } |
| | | } |
| | | } |
| | | if (moreUpToDateServers.size() > 0) |
| | | { |
| | | // Prefer servers more up to date than local server |
| | | return moreUpToDateServers; |
| | | } else |
| | | { |
| | | return upToDateServers; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new list that contains only replication servers that are in the |
| | | * same VM as the local DS, from a passed replication server list. |
| | | * @param bestServers The list of replication servers to filter |
| | | * @return The sub list of replication servers being in the same VM as the |
| | | * local DS (which may be empty) |
| | | */ |
| | | private static Map<Integer, ReplicationServerInfo> filterServersInSameVM( |
| | | Map<Integer, ReplicationServerInfo> bestServers) |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (ReplicationServer.isLocalReplicationServer( |
| | | replicationServerInfo.getServerURL())) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | | * Computes the best replication server the local server should be connected |
| | | * to so that the load is correctly spread across the topology, following the |
| | | * weights guidance. |
| | | * Warning: This method is expected to be called with at least 2 servers in |
| | | * bestServers |
| | | * Note: this method is static for test purpose (access from unit tests) |
| | | * @param bestServers The list of replication servers to consider |
| | | * @param currentRsServerId The replication server the local server is |
| | | * currently connected to. -1 if the local server is not yet connected |
| | | * to any replication server. |
| | | * @param localServerId The server id of the local server. This is not used |
| | | * when it is not connected to a replication server |
| | | * (currentRsServerId = -1) |
| | | * @return The replication server the local server should be connected to |
| | | * as far as the weight is concerned. This may be the currently used one if |
| | | * the weight is correctly spread. If the returned value is null, the best |
| | | * replication server is undetermined but the local server must disconnect |
| | | * (so the best replication server is another one than the current one). |
| | | */ |
| | | public static ReplicationServerInfo computeBestServerForWeight( |
| | | Map<Integer, ReplicationServerInfo> bestServers, int currentRsServerId, |
| | | int localServerId) |
| | | { |
| | | /* |
| | | * - Compute the load goal of each RS, deducing it from the weights affected |
| | | * to them. |
| | | * - Compute the current load of each RS, deducing it from the DSs |
| | | * currently connected to them. |
| | | * - Compute the differences between the load goals and the current loads of |
| | | * the RSs. |
| | | */ |
| | | // Sum of the weights |
| | | int sumOfWeights = 0; |
| | | // Sum of the connected DSs |
| | | int sumOfConnectedDSs = 0; |
| | | for (ReplicationServerInfo replicationServerInfo : bestServers.values()) |
| | | { |
| | | sumOfWeights += replicationServerInfo.getWeight(); |
| | | sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber(); |
| | | } |
| | | // Distance (difference) of the current loads to the load goals of each RS: |
| | | // key:server id, value: distance |
| | | Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>(); |
| | | // Precision for the operations (number of digits after the dot) |
| | | // Default value of rounding method is HALF_UP for |
| | | // the MathContext |
| | | MathContext mathContext = new MathContext(32); |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | |
| | | int rsWeight = replicationServerInfo.getWeight(); |
| | | // load goal = rs weight / sum of weights |
| | | BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide( |
| | | new BigDecimal(sumOfWeights), mathContext); |
| | | BigDecimal currentLoadBd = BigDecimal.ZERO; |
| | | if (sumOfConnectedDSs != 0) |
| | | { |
| | | // current load = number of connected DSs / total number of DSs |
| | | int connectedDSs = replicationServerInfo.getConnectedDSNumber(); |
| | | currentLoadBd = (new BigDecimal(connectedDSs)).divide( |
| | | new BigDecimal(sumOfConnectedDSs), mathContext); |
| | | } |
| | | // load distance = load goal - current load |
| | | BigDecimal loadDistanceBd = |
| | | loadGoalBd.subtract(currentLoadBd, mathContext); |
| | | loadDistances.put(rsId, loadDistanceBd); |
| | | } |
| | | |
| | | if (currentRsServerId == -1) |
| | | { |
| | | // The local server is not connected yet |
| | | |
| | | /* |
| | | * Find the server with the current highest distance to its load goal and |
| | | * choose it. Make an exception if every server is correctly balanced, |
| | | * that is every current load distances are equal to 0, in that case, |
| | | * choose the server with the highest weight |
| | | */ |
| | | int bestRsId = 0; // If all server equal, return the first one |
| | | float highestDistance = Float.NEGATIVE_INFINITY; |
| | | boolean allRsWithZeroDistance = true; |
| | | int highestWeightRsId = -1; |
| | | int highestWeight = -1; |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | float loadDistance = loadDistances.get(rsId).floatValue(); |
| | | if (loadDistance > highestDistance) |
| | | { |
| | | // This server is far more from its balance point |
| | | bestRsId = rsId; |
| | | highestDistance = loadDistance; |
| | | } |
| | | if (loadDistance != (float)0) |
| | | { |
| | | allRsWithZeroDistance = false; |
| | | } |
| | | int weight = bestServers.get(rsId).getWeight(); |
| | | if (weight > highestWeight) |
| | | { |
| | | // This server has a higher weight |
| | | highestWeightRsId = rsId; |
| | | highestWeight = weight; |
| | | } |
| | | } |
| | | // All servers with a 0 distance ? |
| | | if (allRsWithZeroDistance) |
| | | { |
| | | // Choose server withe the highest weight |
| | | bestRsId = highestWeightRsId; |
| | | } |
| | | return bestServers.get(bestRsId); |
| | | } else |
| | | { |
| | | // The local server is currently connected to a RS, let's see if it must |
| | | // disconnect or not, taking the weights into account. |
| | | |
| | | float currentLoadDistance = |
| | | loadDistances.get(currentRsServerId).floatValue(); |
| | | if (currentLoadDistance < (float) 0) |
| | | { |
| | | // Too much DSs connected to the current RS, compared with its load |
| | | // goal: |
| | | // Determine the potential number of DSs to disconnect from the current |
| | | // RS and see if the local DS is part of them: the DSs that must |
| | | // disconnect are those with the lowest server id. |
| | | // Compute the sum of the distances of the load goals of the other RSs |
| | | BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO; |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | if (rsId != currentRsServerId) |
| | | { |
| | | sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add( |
| | | loadDistances.get(rsId), mathContext); |
| | | } |
| | | } |
| | | |
| | | if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0) |
| | | { |
| | | // The average distance of the other RSs shows a lack of DSs. |
| | | // Compute the number of DSs to disconnect from the current RS, |
| | | // rounding to the nearest integer number. Do only this if there is |
| | | // no risk of yoyo effect: when the exact balance cannot be |
| | | // established due to the current number of DSs connected, do not |
| | | // disconnect a DS. A simple example where the balance cannot be |
| | | // reached is: |
| | | // - RS1 has weight 1 and 2 DSs |
| | | // - RS2 has weight 1 and 1 DS |
| | | // => disconnecting a DS from RS1 to reconnect it to RS2 would have no |
| | | // sense as this would lead to the reverse situation. In that case, |
| | | // the perfect balance cannot be reached and we must stick to the |
| | | // current situation, otherwise the DS would keep move between the 2 |
| | | // RSs |
| | | float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd. |
| | | multiply(new BigDecimal(sumOfConnectedDSs), mathContext). |
| | | floatValue(); |
| | | int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber); |
| | | |
| | | // Avoid yoyo effect |
| | | if (overloadingDSsNumber == 1) |
| | | { |
| | | // What would be the new load distance for the current RS if |
| | | // we disconnect some DSs ? |
| | | ReplicationServerInfo currentReplicationServerInfo = |
| | | bestServers.get(currentRsServerId); |
| | | |
| | | int currentRsWeight = currentReplicationServerInfo.getWeight(); |
| | | BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight); |
| | | BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights); |
| | | BigDecimal currentRsLoadGoalBd = |
| | | currentRsWeightBd.divide(sumOfWeightsBd, mathContext); |
| | | BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0); |
| | | if (sumOfConnectedDSs != 0) |
| | | { |
| | | int connectedDSs = currentReplicationServerInfo. |
| | | getConnectedDSNumber(); |
| | | BigDecimal potentialNewConnectedDSsBd = |
| | | new BigDecimal(connectedDSs - 1); |
| | | BigDecimal sumOfConnectedDSsBd = |
| | | new BigDecimal(sumOfConnectedDSs); |
| | | potentialCurrentRsNewLoadBd = |
| | | potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd, |
| | | mathContext); |
| | | } |
| | | BigDecimal potentialCurrentRsNewLoadDistanceBd = |
| | | currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd, |
| | | mathContext); |
| | | |
| | | // What would be the new load distance for the other RSs ? |
| | | BigDecimal additionalDsLoadBd = |
| | | (new BigDecimal(1)).divide( |
| | | new BigDecimal(sumOfConnectedDSs), mathContext); |
| | | BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd = |
| | | sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd, |
| | | mathContext); |
| | | |
| | | // Now compare both values: we must no disconnect the DS if this |
| | | // is for going in a situation where the load distance of the other |
| | | // RSs is the opposite of the future load distance of the local RS |
| | | // or we would evaluate that we should disconnect just after being |
| | | // arrived on the new RS. But we should disconnect if we reach the |
| | | // perfect balance (both values are 0). |
| | | MathContext roundMc = |
| | | new MathContext(6, RoundingMode.DOWN); |
| | | BigDecimal potentialCurrentRsNewLoadDistanceBdRounded = |
| | | potentialCurrentRsNewLoadDistanceBd.round(roundMc); |
| | | BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded = |
| | | potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc); |
| | | |
| | | if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo( |
| | | BigDecimal.ZERO) != 0) |
| | | && (potentialCurrentRsNewLoadDistanceBdRounded.equals( |
| | | potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate()))) |
| | | { |
| | | // Avoid the yoyo effect, and keep the local DS connected to its |
| | | // current RS |
| | | return bestServers.get(currentRsServerId); |
| | | } |
| | | } |
| | | |
| | | // Prepare a sorted list (from lowest to highest) or DS server ids |
| | | // connected to the current RS |
| | | ReplicationServerInfo currentRsInfo = |
| | | bestServers.get(currentRsServerId); |
| | | List<Integer> serversConnectedToCurrentRS = |
| | | currentRsInfo.getConnectedDSs(); |
| | | List<Integer> sortedServers = new ArrayList<Integer>( |
| | | serversConnectedToCurrentRS); |
| | | Collections.sort(sortedServers); |
| | | |
| | | // Go through the list of DSs to disconnect and see if the local |
| | | // server is part of them. |
| | | int index = 0; |
| | | while (overloadingDSsNumber > 0) |
| | | { |
| | | int severToDisconnectId = sortedServers.get(index); |
| | | if (severToDisconnectId == localServerId) |
| | | { |
| | | // The local server is part of the DSs to disconnect |
| | | return null; |
| | | } |
| | | overloadingDSsNumber--; |
| | | index++; |
| | | } |
| | | |
| | | // The local server is not part of the servers to disconnect from the |
| | | // current RS. |
| | | return bestServers.get(currentRsServerId); |
| | | } else |
| | | { |
| | | // The average distance of the other RSs does not show a lack of DSs: |
| | | // no need to disconnect any DS from the current RS. |
| | | return bestServers.get(currentRsServerId); |
| | | } |
| | | } else |
| | | { |
| | | // The RS load goal is reached or there are not enough DSs connected to |
| | | // it to reach it: do not disconnect from this RS and return rsInfo for |
| | | // this RS |
| | | return bestServers.get(currentRsServerId); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Starts the same group id poller. |
| | | */ |
| | | private void startSameGroupIdPoller() |
| | | { |
| | | sameGroupIdPoller = new SameGroupIdPoller(); |
| | | sameGroupIdPoller.start(); |
| | | } |
| | | |
| | | /** |
| | | * Stops the same group id poller. |
| | | */ |
| | | private synchronized void stopSameGroupIdPoller() |
| | | { |
| | | if (sameGroupIdPoller != null) |
| | | { |
| | | sameGroupIdPoller.shutdown(); |
| | | sameGroupIdPoller.waitForShutdown(); |
| | | sameGroupIdPoller = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stop the heartbeat monitor thread. |
| | | */ |
| | | synchronized void stopRSHeartBeatMonitoring() |
| | |
| | | * Receive a message. |
| | | * This method is not multithread safe and should either always be |
| | | * called in a single thread or protected by a locking mechanism |
| | | * before being called. |
| | | * before being called. This is a wrapper to the method with a boolean version |
| | | * so that we do not have to modify existing tests. |
| | | * |
| | | * @return the received message |
| | | * @throws SocketTimeoutException if the timeout set by setSoTimeout |
| | |
| | | */ |
| | | public ReplicationMsg receive() throws SocketTimeoutException |
| | | { |
| | | return receive(false); |
| | | } |
| | | |
| | | /** |
| | | * Receive a message. |
| | | * This method is not multithread safe and should either always be |
| | | * called in a single thread or protected by a locking mechanism |
| | | * before being called. |
| | | * |
| | | * @return the received message |
| | | * @throws SocketTimeoutException if the timeout set by setSoTimeout |
| | | * has expired |
| | | * @param allowReconnectionMechanism If true, this allows the reconnection |
| | | * mechanism to disconnect the broker if it detects that it should reconnect |
| | | * to another replication server because of some criteria defined by the |
| | | * algorithm where we choose a suitable replication server. |
| | | */ |
| | | public ReplicationMsg receive(boolean allowReconnectionMechanism) |
| | | throws SocketTimeoutException |
| | | { |
| | | while (shutdown == false) |
| | | { |
| | | if (!connected) |
| | |
| | | { |
| | | WindowMsg windowMsg = (WindowMsg) msg; |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | else if (msg instanceof TopologyMsg) |
| | | } else if (msg instanceof TopologyMsg) |
| | | { |
| | | TopologyMsg topoMsg = (TopologyMsg)msg; |
| | | TopologyMsg topoMsg = (TopologyMsg) msg; |
| | | receiveTopo(topoMsg); |
| | | } |
| | | else if (msg instanceof StopMsg) |
| | | if (allowReconnectionMechanism) |
| | | { |
| | | // Reset wait time before next computation of best server |
| | | mustRunBestServerCheckingAlgorithm = 0; |
| | | } |
| | | } else if (msg instanceof StopMsg) |
| | | { |
| | | /* |
| | | * RS performs a proper disconnection |
| | |
| | | logError(message); |
| | | // Try to find a suitable RS |
| | | this.reStart(failingSession); |
| | | } |
| | | else if (msg instanceof MonitorMsg) |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | // This is the response to a MonitorRequest that was sent earlier or |
| | | // the regular message of the monitoring publisher of the RS. |
| | |
| | | monitorResponse.notify(); |
| | | } |
| | | |
| | | // Extract and store replication servers ServerStates |
| | | rsStates = new HashMap<Integer, ServerState>(); |
| | | // Update the replication servers ServerStates with new received info |
| | | it = monitorMsg.rsIterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | int srvId = it.next(); |
| | | rsStates.put(srvId, monitorMsg.getRSServerState(srvId)); |
| | | ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId); |
| | | if (rsInfo != null) |
| | | { |
| | | rsInfo.update(monitorMsg.getRSServerState(srvId)); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | |
| | | // Now if it is allowed, compute the best replication server to see if |
| | | // it is still the one we are currently connected to. If not, |
| | | // disconnect properly and let the connection algorithm re-connect to |
| | | // best replication server |
| | | if (allowReconnectionMechanism) |
| | | { |
| | | mustRunBestServerCheckingAlgorithm++; |
| | | if (mustRunBestServerCheckingAlgorithm == 2) |
| | | { |
| | | // Stable topology (no topo msg since few seconds): proceed with |
| | | // best server checking. |
| | | ReplicationServerInfo bestServerInfo = |
| | | computeBestReplicationServer(false, rsServerId, state, |
| | | replicationServerInfos, serverId, baseDn, groupId, |
| | | generationID); |
| | | |
| | | if ((bestServerInfo == null) || |
| | | (bestServerInfo.getServerId() != rsServerId)) |
| | | { |
| | | // The best replication server is no more the one we are |
| | | // currently using. Disconnect properly then reconnect. |
| | | Message message = |
| | | NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(), |
| | | Integer.toString(serverId), |
| | | Integer.toString(rsServerId), |
| | | rsServerUrl); |
| | | logError(message); |
| | | reStart(); |
| | | } |
| | | |
| | | // Reset wait time before next computation of best server |
| | | mustRunBestServerCheckingAlgorithm = 0; |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | return msg; |
| | | } |
| | |
| | | if (shutdown == false) |
| | | { |
| | | if ((session == null) || (!session.closeInitiated())) |
| | | |
| | | { |
| | | /* |
| | | * We did not initiate the close on our side, log an error message. |
| | | */ |
| | | Message message = |
| | | ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer, |
| | | Integer.toString(rsServerId), baseDn.toString(), |
| | | Integer.toString(serverId)); |
| | | Integer.toString(rsServerId), baseDn.toString(), |
| | | Integer.toString(serverId)); |
| | | logError(message); |
| | | } |
| | | this.reStart(failingSession); |
| | |
| | | } |
| | | } |
| | | } catch (InterruptedException e) |
| | | {} |
| | | { |
| | | } |
| | | return replicaStates; |
| | | } |
| | | |
| | |
| | | { |
| | | try |
| | | { |
| | | updateDoneCount ++; |
| | | updateDoneCount++; |
| | | if ((updateDoneCount >= halfRcvWindow) && (session != null)) |
| | | { |
| | | session.publish(new WindowMsg(updateDoneCount)); |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker " + serverId + " is stopping and will" + |
| | | " close the connection to replication server " + rsServerId + " for" |
| | | + " domain " + baseDn); |
| | | " close the connection to replication server " + rsServerId + " for" + |
| | | " domain " + baseDn); |
| | | } |
| | | stopSameGroupIdPoller(); |
| | | stopRSHeartBeatMonitoring(); |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | replicationServer = "stopped"; |
| | |
| | | * @param groupId The new group id to use |
| | | */ |
| | | public boolean changeConfig( |
| | | Collection<String> replicationServers, int window, long heartbeatInterval, |
| | | byte groupId) |
| | | Collection<String> replicationServers, int window, long heartbeatInterval, |
| | | byte groupId) |
| | | { |
| | | // These parameters needs to be renegotiated with the ReplicationServer |
| | | // so if they have changed, that requires restarting the session with |
| | |
| | | // A new session is necessary only when information regarding |
| | | // the connection is modified |
| | | if ((servers == null) || |
| | | (!(replicationServers.size() == servers.size() |
| | | && replicationServers.containsAll(servers))) || |
| | | window != this.maxRcvWindow || |
| | | heartbeatInterval != this.heartbeatInterval || |
| | | (groupId != this.groupId)) |
| | | (!(replicationServers.size() == servers.size() && replicationServers. |
| | | containsAll(servers))) || |
| | | window != this.maxRcvWindow || |
| | | heartbeatInterval != this.heartbeatInterval || |
| | | (groupId != this.groupId)) |
| | | { |
| | | needToRestartSession = true; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * In case we are connected to a RS with a different group id, we use this |
| | | * thread to poll presence of a RS with the same group id as ours. If a RS |
| | | * with the same group id is available, we close the session to force |
| | | * reconnection. Reconnection will choose a server with the same group id. |
| | | */ |
| | | private class SameGroupIdPoller extends DirectoryThread |
| | | { |
| | | |
| | | private boolean sameGroupIdPollershutdown = false; |
| | | private boolean terminated = false; |
| | | // Sleep interval in ms |
| | | private static final int SAME_GROUP_ID_POLLER_PERIOD = 5000; |
| | | |
| | | public SameGroupIdPoller() |
| | | { |
| | | super("Replication Broker Same Group Id Poller for " + baseDn.toString() + |
| | | " and group id " + groupId + " in server id " + serverId); |
| | | } |
| | | |
| | | /** |
| | | * Wait for the completion of the same group id poller. |
| | | */ |
| | | public void waitForShutdown() |
| | | { |
| | | try |
| | | { |
| | | while (terminated == false) |
| | | { |
| | | Thread.sleep(50); |
| | | } |
| | | } catch (InterruptedException e) |
| | | { |
| | | // exit the loop if this thread is interrupted. |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the same group id poller. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | sameGroupIdPollershutdown = true; |
| | | } |
| | | |
| | | /** |
| | | * Permanently look for RS with our group id and if found, break current |
| | | * connection to force reconnection to a new server with the right group id. |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | boolean done = false; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() + |
| | | " started."); |
| | | } |
| | | |
| | | while ((!done) && (!sameGroupIdPollershutdown)) |
| | | { |
| | | // Sleep some time between checks |
| | | try |
| | | { |
| | | Thread.sleep(SAME_GROUP_ID_POLLER_PERIOD); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // Stop as we are interrupted |
| | | sameGroupIdPollershutdown = true; |
| | | } |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Running SameGroupIdPoller for: " + |
| | | baseDn.toString()); |
| | | } |
| | | if (session != null) // Check only if not already disconnected |
| | | |
| | | { |
| | | for (String server : servers) |
| | | { |
| | | // Do not ask the RS we are connected to as it has for sure the |
| | | // wrong group id |
| | | if (server.equals(rsServerUrl)) |
| | | continue; |
| | | |
| | | // Connect to server and get reply message |
| | | ServerInfo serverInfo = |
| | | performPhaseOneHandshake(server, false); |
| | | |
| | | // Is it a server with our group id ? |
| | | if (serverInfo != null) |
| | | { |
| | | if (groupId == serverInfo.getGroupId()) |
| | | { |
| | | // Found one server with the same group id as us, disconnect |
| | | // session to force reconnection to a server with same group |
| | | // id. |
| | | Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get( |
| | | Byte.toString(groupId), baseDn.toString(), |
| | | Integer.toString(serverId)); |
| | | logError(message); |
| | | |
| | | if (protocolVersion >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // V4 protocol introduces a StopMsg to properly end |
| | | // communications |
| | | try |
| | | { |
| | | session.publish(new StopMsg()); |
| | | } catch (IOException ioe) |
| | | { |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | } |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (Exception e) |
| | | { |
| | | // The session was already closed, just ignore. |
| | | } |
| | | session = null; |
| | | done = true; // Terminates thread as did its job. |
| | | |
| | | break; |
| | | } |
| | | } |
| | | } // for server |
| | | |
| | | } |
| | | } |
| | | } |
| | | |
| | | terminated = true; |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() + |
| | | " terminated."); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Signals the RS we just entered a new status. |
| | | * @param newStatus The status the local DS just entered |
| | | */ |
| | |
| | | */ |
| | | public List<RSInfo> getRsList() |
| | | { |
| | | return rsList; |
| | | List<RSInfo> result = new ArrayList<RSInfo>(); |
| | | |
| | | for (ReplicationServerInfo replicationServerInfo : |
| | | replicationServerInfos.values()) |
| | | { |
| | | result.add(replicationServerInfo.toRSInfo()); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | | * Computes the list of DSs connected to a particular RS. |
| | | * @param rsId The RS id of the server one wants to know the connected DSs |
| | | * @param dsList The list of DSinfo from which to compute things |
| | | * @return The list of connected DSs to the server rsId |
| | | */ |
| | | private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList) |
| | | { |
| | | List<Integer> connectedDSs = new ArrayList<Integer>(); |
| | | |
| | | if (rsServerId == rsId) |
| | | { |
| | | // If we are computing connected DSs for the RS we are connected |
| | | // to, we should count the local DS as the DSInfo of the local DS is not |
| | | // sent by the replication server in the topology message. We must count |
| | | // ourself as a connected server. |
| | | connectedDSs.add(serverId); |
| | | } |
| | | |
| | | for (DSInfo dsInfo : dsList) |
| | | { |
| | | if (dsInfo.getRsId() == rsId) |
| | | connectedDSs.add(dsInfo.getDsId()); |
| | | } |
| | | |
| | | return connectedDSs; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void receiveTopo(TopologyMsg topoMsg) |
| | | { |
| | | // Store new lists |
| | | synchronized(getDsList()) |
| | | // Store new DS list |
| | | dsList = topoMsg.getDsList(); |
| | | |
| | | // Update replication server info list with the received topology |
| | | // information |
| | | List<Integer> rsToKeepList = new ArrayList<Integer>(); |
| | | for (RSInfo rsInfo : topoMsg.getRsList()) |
| | | { |
| | | synchronized(getRsList()) |
| | | int rsId = rsInfo.getId(); |
| | | rsToKeepList.add(rsId); // Mark this server as still existing |
| | | List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList); |
| | | ReplicationServerInfo replicationServerInfo = |
| | | replicationServerInfos.get(rsId); |
| | | if (replicationServerInfo == null) |
| | | { |
| | | dsList = topoMsg.getDsList(); |
| | | rsList = topoMsg.getRsList(); |
| | | // New replication server, create info for it add it to the list |
| | | replicationServerInfo = |
| | | new ReplicationServerInfo(rsInfo, connectedDSs); |
| | | // Set the locally configured flag for this new RS only if it is |
| | | // configured |
| | | updateRSInfoLocallyConfiguredStatus(replicationServerInfo); |
| | | replicationServerInfos.put(rsId, replicationServerInfo); |
| | | } else |
| | | { |
| | | // Update the existing info for the replication server |
| | | replicationServerInfo.update(rsInfo, connectedDSs); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Now remove any replication server that may have disappeared from the |
| | | * topology. |
| | | */ |
| | | Iterator<Entry<Integer, ReplicationServerInfo>> rsInfoIt = |
| | | replicationServerInfos.entrySet().iterator(); |
| | | while (rsInfoIt.hasNext()) |
| | | { |
| | | Entry<Integer, ReplicationServerInfo> rsInfoEntry = rsInfoIt.next(); |
| | | if (!rsToKeepList.contains(rsInfoEntry.getKey())) |
| | | { |
| | | // This replication server has quit the topology, remove it from the |
| | | // list |
| | | rsInfoIt.remove(); |
| | | } |
| | | } |
| | | if (domain != null) |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check if the broker could not find any Replication Server and therefore |
| | | * connection attempt failed. |
| | |
| | | { |
| | | ctHeartbeatPublisherThread = |
| | | new CTHeartbeatPublisherThread( |
| | | "Replication CN Heartbeat sender for " + |
| | | baseDn + " with " + getReplicationServer(), |
| | | session, changeTimeHeartbeatSendInterval, serverId); |
| | | "Replication CN Heartbeat sender for " + |
| | | baseDn + " with " + getReplicationServer(), |
| | | session, changeTimeHeartbeatSendInterval, serverId); |
| | | ctHeartbeatPublisherThread.start(); |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this + |
| | | " is not configured to send CN heartbeat interval"); |
| | | " is not configured to send CN heartbeat interval"); |
| | | } |
| | | } |
| | | |