/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2006-2010 Sun Microsystems, Inc. */ package org.opends.server.replication.service; import java.net.UnknownHostException; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.IOException; import java.math.BigDecimal; import java.math.MathContext; import java.math.RoundingMode; import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.messages.Severity; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.DSInfo; import org.opends.server.replication.common.MutableBoolean; import org.opends.server.replication.common.RSInfo; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.ChangeStatusMsg; import org.opends.server.replication.protocol.HeartbeatMonitor; import org.opends.server.replication.protocol.MonitorMsg; import org.opends.server.replication.protocol.MonitorRequestMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplServerStartDSMsg; import org.opends.server.replication.protocol.ReplServerStartMsg; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.ServerStartECLMsg; import org.opends.server.replication.protocol.ServerStartMsg; import org.opends.server.replication.protocol.StartECLSessionMsg; import org.opends.server.replication.protocol.StartSessionMsg; import org.opends.server.replication.protocol.StopMsg; import org.opends.server.replication.protocol.TopologyMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.protocol.WindowMsg; import org.opends.server.replication.protocol.WindowProbeMsg; import org.opends.server.util.ServerConstants; import org.opends.server.replication.server.ReplicationServer; /** * The broker for Multi-master Replication. */ public class ReplicationBroker { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private boolean shutdown = false; private Collection servers; private boolean connected = false; private String replicationServer = "Not connected"; private ProtocolSession session = null; private final ServerState state; private final String baseDn; private final int serverId; private Semaphore sendWindow; private int maxSendWindow; private int rcvWindow = 100; private int halfRcvWindow = rcvWindow / 2; private int maxRcvWindow = rcvWindow; private int timeout = 0; private short protocolVersion; private ReplSessionSecurity replSessionSecurity; // My group id private byte groupId = (byte) -1; // The group id of the RS we are connected to private byte rsGroupId = (byte) -1; // The server id of the RS we are connected to private Integer rsServerId = -1; // The server URL of the RS we are connected to private String rsServerUrl = null; // Our replication domain private ReplicationDomain domain = null; /** * This object is used as a conditional event to be notified about * the reception of monitor information from the Replication Server. */ private final MutableBoolean monitorResponse = new MutableBoolean(false); /** * A Map containing the ServerStates of all the replicas in the topology * as seen by the ReplicationServer the last time it was polled or the last * time it published monitoring information. */ private HashMap replicaStates = new HashMap(); /** * The expected duration in milliseconds between heartbeats received * from the replication server. Zero means heartbeats are off. */ private long heartbeatInterval = 0; /** * A thread to monitor heartbeats on the session. */ private HeartbeatMonitor heartbeatMonitor = null; /** * The number of times the connection was lost. */ private int numLostConnections = 0; /** * When the broker cannot connect to any replication server * it log an error and keeps continuing every second. * This boolean is set when the first failure happens and is used * to avoid repeating the error message for further failure to connect * and to know that it is necessary to print a new message when the broker * finally succeed to connect. */ private boolean connectionError = false; private final Object connectPhaseLock = new Object(); /** * The thread that publishes messages to the RS containing the current * change time of this DS. */ private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null; /** * The expected period in milliseconds between these messages are sent * to the replication server. Zero means heartbeats are off. */ private long changeTimeHeartbeatSendInterval = 0; /* * Properties for the last topology info received from the network. */ // Info for other DSs. // Warning: does not contain info for us (for our server id) private List dsList = new ArrayList(); private long generationID; private int updateDoneCount = 0; private boolean connectRequiresRecovery = false; /** * The map of replication server info initialized at connection time and * regularly updated. This is used to decide to which best suitable * replication server one wants to connect. * Key: replication server id * Value: replication server info for the matching replication server id */ private Map replicationServerInfos = null; /** * This integer defines when the best replication server checking algorithm * should be engaged. * Every time a monitoring message (each monitoring publisher period) is * received, it is incremented. When it reaches 2, we run the checking * algorithm to see if we must reconnect to another best replication server. * Then we reset the value to 0. But when a topology message is received, the * integer is reseted to 0. This insures that we wait at least one monitoring * publisher period before running the algorithm, but also that we wait at * least for a monitoring period after the last received topology message * (topology stabilization). */ private int mustRunBestServerCheckingAlgorithm = 0; /** * Creates a new ReplicationServer Broker for a particular ReplicationDomain. * * @param replicationDomain The replication domain that is creating us. * @param state The ServerState that should be used by this broker * when negotiating the session with the replicationServer. * @param baseDn The base DN that should be used by this broker * when negotiating the session with the replicationServer. * @param serverID2 The server ID that should be used by this broker * when negotiating the session with the replicationServer. * @param window The size of the send and receive window to use. * @param generationId The generationId for the server associated to the * provided serverId and for the domain associated to the provided baseDN. * @param heartbeatInterval The interval (in ms) between heartbeats requested * from the replicationServer, or zero if no heartbeats are requested. * @param replSessionSecurity The session security configuration. * @param groupId The group id of our domain. * @param changeTimeHeartbeatInterval The interval (in ms) between Change * time heartbeats are sent to the RS, * or zero if no CN heartbeat should be sent. */ public ReplicationBroker(ReplicationDomain replicationDomain, ServerState state, String baseDn, int serverID2, int window, long generationId, long heartbeatInterval, ReplSessionSecurity replSessionSecurity, byte groupId, long changeTimeHeartbeatInterval) { this.domain = replicationDomain; this.baseDn = baseDn; this.serverId = serverID2; this.state = state; this.protocolVersion = ProtocolVersion.getCurrentVersion(); this.replSessionSecurity = replSessionSecurity; this.groupId = groupId; this.generationID = generationId; this.heartbeatInterval = heartbeatInterval; this.maxRcvWindow = window; this.maxRcvWindow = window; this.halfRcvWindow = window / 2; this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval; } /** * Start the ReplicationBroker. */ public void start() { shutdown = false; this.rcvWindow = this.maxRcvWindow; this.connect(); } /** * Start the ReplicationBroker. * * @param servers list of servers used */ public void start(Collection servers) { /* * Open Socket to the ReplicationServer * Send the Start message */ shutdown = false; this.servers = servers; if (servers.size() < 1) { Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get(); logError(message); } this.rcvWindow = this.maxRcvWindow; this.connect(); } /** * Gets the group id of the RS we are connected to. * @return The group id of the RS we are connected to */ public byte getRsGroupId() { return rsGroupId; } /** * Gets the server id of the RS we are connected to. * @return The server id of the RS we are connected to */ public Integer getRsServerId() { return rsServerId; } /** * Gets the server id. * @return The server id */ public int getServerId() { return serverId; } /** * Gets the server id. * @return The server id */ private long getGenerationID() { if (domain != null) { // Update the generation id generationID = domain.getGenerationID(); } return generationID; } /** * Set the generation id - for test purpose. * @param generationID The generation id */ public void setGenerationID(long generationID) { this.generationID = generationID; } /** * Gets the server url of the RS we are connected to. * @return The server url of the RS we are connected to */ public String getRsServerUrl() { return rsServerUrl; } /** * Sets the locally configured flag for the passed ReplicationServerInfo * object, analyzing the local configuration. * @param */ private void updateRSInfoLocallyConfiguredStatus( ReplicationServerInfo replicationServerInfo) { // Determine if the passed ReplicationServerInfo has a URL that is present // in the locally configured replication servers String rsUrl = replicationServerInfo.getServerURL(); if (rsUrl == null) { // The ReplicationServerInfo has been generated from a server with // no URL in TopologyMsg (i.e: with replication protocol version < 4): // ignore this server as we do not know how to connect to it replicationServerInfo.setLocallyConfigured(false); return; } for (String serverUrl : servers) { if (isSameReplicationServerUrl(serverUrl, rsUrl)) { // This RS is locally configured, mark this replicationServerInfo.setLocallyConfigured(true); return; } } replicationServerInfo.setLocallyConfigured(false); } /** * Compares 2 replication servers addresses and returns true if they both * represent the same replication server instance. * @param rs1Url Replication server 1 address * @param rs2Url Replication server 2 address * @return True if both replication server addresses represent the same * replication server instance, false otherwise. */ private static boolean isSameReplicationServerUrl(String rs1Url, String rs2Url) { // Get and compare ports of RS1 and RS2 int separator1 = rs1Url.lastIndexOf(':'); if (separator1 < 0) { // Not a RS url: should not happen return false; } int rs1Port = Integer.parseInt(rs1Url.substring(separator1 + 1)); int separator2 = rs2Url.lastIndexOf(':'); if (separator2 < 0) { // Not a RS url: should not happen return false; } int rs2Port = Integer.parseInt(rs2Url.substring(separator2 + 1)); if (rs1Port != rs2Port) { return false; } // Get and compare addresses of RS1 and RS2 String rs1 = rs1Url.substring(0, separator1); InetAddress[] rs1Addresses = null; try { if (rs1.equals("localhost") || rs1.equals("127.0.0.1")) { // Replace localhost with the local official hostname rs1 = InetAddress.getLocalHost().getHostName(); } rs1Addresses = InetAddress.getAllByName(rs1); } catch (UnknownHostException ex) { // Unknown RS: should not happen return false; } String rs2 = rs2Url.substring(0, separator2); InetAddress[] rs2Addresses = null; try { if (rs2.equals("localhost") || rs2.equals("127.0.0.1")) { // Replace localhost with the local official hostname rs2 = InetAddress.getLocalHost().getHostName(); } rs2Addresses = InetAddress.getAllByName(rs2); } catch (UnknownHostException ex) { // Unknown RS: should not happen return false; } // Now compare addresses, if at least one match, this is the same server for (int i = 0; i < rs1Addresses.length; i++) { InetAddress inetAddress1 = rs1Addresses[i]; for (int j = 0; j < rs2Addresses.length; j++) { InetAddress inetAddress2 = rs2Addresses[j]; if (inetAddress2.equals(inetAddress1)) { return true; } } } return false; } /** * Bag class for keeping info we get from a replication server in order to * compute the best one to connect to. This is in fact a wrapper to a * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be * updated with a info coming from received topology messages or monitoring * messages. */ public static class ReplicationServerInfo { private short protocolVersion; private long generationId; private byte groupId = (byte) -1; private int serverId; // Received server URL private String serverURL; private String baseDn = null; private int windowSize; private ServerState serverState = null; private boolean sslEncryption; private int degradedStatusThreshold = -1; // Keeps the 1 value if created with a ReplServerStartMsg private int weight = 1; // Keeps the 0 value if created with a ReplServerStartMsg private int connectedDSNumber = 0; private List connectedDSs = null; // Is this RS locally configured ? (the RS is recognized as a usable server) private boolean locallyConfigured = true; /** * Create a new instance of ReplicationServerInfo wrapping the passed * message. * @param msg Message to wrap. * @return The new instance wrapping the passed message. * @throws IllegalArgumentException If the passed message has an unexpected * type. */ public static ReplicationServerInfo newInstance( ReplicationMsg msg) throws IllegalArgumentException { if (msg instanceof ReplServerStartMsg) { // This is a ReplServerStartMsg (RS uses protocol V3 or under) ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg; return new ReplicationServerInfo(replServerStartMsg); } else if (msg instanceof ReplServerStartDSMsg) { // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher) ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg; return new ReplicationServerInfo(replServerStartDSMsg); } // Unsupported message type: should not happen throw new IllegalArgumentException("Unexpected PDU type: " + msg.getClass().getName() + " :\n" + msg.toString()); } /** * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg. * @param replServerStartMsg The ReplServerStartMsg this object will wrap. */ private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg) { this.protocolVersion = replServerStartMsg.getVersion(); this.generationId = replServerStartMsg.getGenerationId(); this.groupId = replServerStartMsg.getGroupId(); this.serverId = replServerStartMsg.getServerId(); this.serverURL = replServerStartMsg.getServerURL(); this.baseDn = replServerStartMsg.getBaseDn(); this.windowSize = replServerStartMsg.getWindowSize(); this.serverState = replServerStartMsg.getServerState(); this.sslEncryption = replServerStartMsg.getSSLEncryption(); this.degradedStatusThreshold = replServerStartMsg.getDegradedStatusThreshold(); } /** * Constructs a ReplicationServerInfo object wrapping a * ReplServerStartDSMsg. * @param replServerStartDSMsg The ReplServerStartDSMsg this object will * wrap. */ private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg) { this.protocolVersion = replServerStartDSMsg.getVersion(); this.generationId = replServerStartDSMsg.getGenerationId(); this.groupId = replServerStartDSMsg.getGroupId(); this.serverId = replServerStartDSMsg.getServerId(); this.serverURL = replServerStartDSMsg.getServerURL(); this.baseDn = replServerStartDSMsg.getBaseDn(); this.windowSize = replServerStartDSMsg.getWindowSize(); this.serverState = replServerStartDSMsg.getServerState(); this.sslEncryption = replServerStartDSMsg.getSSLEncryption(); this.degradedStatusThreshold = replServerStartDSMsg.getDegradedStatusThreshold(); this.weight = replServerStartDSMsg.getWeight(); this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber(); } /** * Get the server state. * @return The server state */ public ServerState getServerState() { return serverState; } /** * get the group id. * @return The group id */ public byte getGroupId() { return groupId; } /** * Get the server protocol version. * @return the protocolVersion */ public short getProtocolVersion() { return protocolVersion; } /** * Get the generation id. * @return the generationId */ public long getGenerationId() { return generationId; } /** * Get the server id. * @return the serverId */ public int getServerId() { return serverId; } /** * Get the server URL. * @return the serverURL */ public String getServerURL() { return serverURL; } /** * Get the base dn. * @return the baseDn */ public String getBaseDn() { return baseDn; } /** * Get the window size. * @return the windowSize */ public int getWindowSize() { return windowSize; } /** * Get the ssl encryption. * @return the sslEncryption */ public boolean isSslEncryption() { return sslEncryption; } /** * Get the degraded status threshold. * @return the degradedStatusThreshold */ public int getDegradedStatusThreshold() { return degradedStatusThreshold; } /** * Get the weight. * @return the weight. Null if this object is a wrapper for * a ReplServerStartMsg. */ public int getWeight() { return weight; } /** * Get the connected DS number. * @return the connectedDSNumber. Null if this object is a wrapper for * a ReplServerStartMsg. */ public int getConnectedDSNumber() { return connectedDSNumber; } /** * Constructs a new replication server info with the passed RSInfo * internal values and the passed connected DSs. * @param rsInfo The RSinfo to use for the update * @param connectedDSs The new connected DSs */ public ReplicationServerInfo(RSInfo rsInfo, List connectedDSs) { this.serverId = rsInfo.getId(); this.serverURL = rsInfo.getServerUrl(); this.generationId = rsInfo.getGenerationId(); this.groupId = rsInfo.getGroupId(); this.weight = rsInfo.getWeight(); this.connectedDSs = connectedDSs; this.connectedDSNumber = connectedDSs.size(); } /** * Converts the object to a RSInfo object. * @return The RSInfo object matching this object. */ public RSInfo toRSInfo() { return new RSInfo(serverId, serverURL, generationId, groupId, weight); } /** * Updates replication server info with the passed RSInfo internal values * and the passed connected DSs. * @param rsInfo The RSinfo to use for the update * @param connectedDSs The new connected DSs */ public void update(RSInfo rsInfo, List connectedDSs) { this.generationId = rsInfo.getGenerationId(); this.groupId = rsInfo.getGroupId(); this.weight = rsInfo.getWeight(); this.connectedDSs = connectedDSs; this.connectedDSNumber = connectedDSs.size(); } /** * Updates replication server info with the passed server state. * @param serverState The ServerState to use for the update */ public void update(ServerState serverState) { if (this.serverState != null) { this.serverState.update(serverState); } else { this.serverState = serverState; } } /** * Get the getConnectedDSs. * @return the getConnectedDSs */ public List getConnectedDSs() { return connectedDSs; } /** * Gets the locally configured status for this RS. * @return the locallyConfigured */ public boolean isLocallyConfigured() { return locallyConfigured; } /** * Sets the locally configured status for this RS. * @param locallyConfigured the locallyConfigured to set */ public void setLocallyConfigured(boolean locallyConfigured) { this.locallyConfigured = locallyConfigured; } /** * Returns a string representation of this object. * @return A string representation of this object. */ public String toString() { return "Url:"+ this.getServerURL() + " ServerId:" + this.serverId; } } private void connect() { if (this.baseDn.compareToIgnoreCase( ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0) { connectAsECL(); } else { connectAsDataServer(); } } /** * Contacts all replication servers to get information from them and being * able to choose the more suitable. * @return the collected information. */ private Map collectReplicationServersInfo() { Map rsInfos = new HashMap(); for (String server : servers) { // Connect to server and get info about it ReplicationServerInfo replicationServerInfo = performPhaseOneHandshake(server, false); // Store server info in list if (replicationServerInfo != null) { rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo); } } return rsInfos; } /** * Special aspects of connecting as ECL compared to connecting as data server * are : * - 1 single RS configured * - so no choice of the preferred RS * - ?? Heartbeat * - Start handshake is : * Broker ---> StartECLMsg ---> RS * <---- ReplServerStartMsg --- * ---> StartSessionECLMsg --> RS */ private void connectAsECL() { // FIXME:ECL List of RS to connect is for now limited to one RS only String bestServer = this.servers.iterator().next(); ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake( bestServer, true); if (inReplServerStartDSMsg != null) performECLPhaseTwoHandshake(bestServer); } /** * Connect to a ReplicationServer. * * Handshake sequences between a DS and a RS is divided into 2 logical * consecutive phases (phase 1 and phase 2). DS always initiates connection * and always sends first message: * * DS<->RS: * ------- * * phase 1: * DS --- ServerStartMsg ---> RS * DS <--- ReplServerStartDSMsg --- RS * phase 2: * DS --- StartSessionMsg ---> RS * DS <--- TopologyMsg --- RS * * Before performing a full handshake sequence, DS searches for best suitable * RS by making only phase 1 handshake to every RS he knows then closing * connection. This allows to gather information on available RSs and then * decide with which RS the full handshake (phase 1 then phase 2) will be * finally performed. * * @throws NumberFormatException address was invalid */ private void connectAsDataServer() { // May have created a broker with null replication domain for // unit test purpose. if (domain != null) { // If a first connect or a connection failure occur, we go through here. // force status machine to NOT_CONNECTED_STATUS so that monitoring can // see that we are not connected. domain.toNotConnectedStatus(); } // Stop any existing poller and heartbeat monitor from a previous session. stopRSHeartBeatMonitoring(); stopChangeTimeHeartBeatPublishing(); mustRunBestServerCheckingAlgorithm = 0; boolean newServerWithSameGroupId = false; synchronized (connectPhaseLock) { /* * Connect to each replication server and get their ServerState then find * out which one is the best to connect to. */ if (debugEnabled()) debugInfo("serverId: " + serverId + " phase 1 : will perform PhaseOneH with each RS in " + " order to elect the preferred one"); // Get info from every available replication servers replicationServerInfos = collectReplicationServersInfo(); ReplicationServerInfo replicationServerInfo = null; if (replicationServerInfos.size() > 0) { // At least one server answered, find the best one. replicationServerInfo = computeBestReplicationServer(true, -1, state, replicationServerInfos, serverId, baseDn, groupId, this.getGenerationID()); // Best found, now initialize connection to this one (handshake phase 1) if (debugEnabled()) debugInfo("serverId: " + serverId + " phase 2 : will perform PhaseOneH with the preferred RS=" + replicationServerInfo); replicationServerInfo = performPhaseOneHandshake( replicationServerInfo.getServerURL(), true); if (replicationServerInfo != null) { // Update replication server info with potentially more up to date // data (server state for instance may have changed) replicationServerInfos.put(replicationServerInfo.getServerId(), replicationServerInfo); // Handshake phase 1 exchange went well // Compute in which status we are starting the session to tell the RS ServerStatus initStatus = computeInitialServerStatus(replicationServerInfo.getGenerationId(), replicationServerInfo.getServerState(), replicationServerInfo.getDegradedStatusThreshold(), this.getGenerationID()); // Perform session start (handshake phase 2) TopologyMsg topologyMsg = performPhaseTwoHandshake( replicationServerInfo.getServerURL(), initStatus); if (topologyMsg != null) // Handshake phase 2 exchange went well { try { /* * If we just connected to a RS with a different group id than us * (because for instance a RS with our group id was unreachable * while connecting to each RS) but the just received TopologyMsg * shows that in the same time a RS with our group id connected, * we must give up the connection to force reconnection that will * certainly go back to a server with our group id as server with * our group id have a greater priority for connection (in * computeBestReplicationServer). In other words, we disconnect to * connect to a server with our group id. If a server with our * group id comes back later in the topology, we will be advised * upon reception of a new TopologyMsg message and we will force * reconnection at that time to retrieve a server with our group * id. */ byte tmpRsGroupId = replicationServerInfo.getGroupId(); boolean someServersWithSameGroupId = hasSomeServerWithSameGroupId(topologyMsg.getRsList()); // Really no other server with our group id ? if ((tmpRsGroupId == groupId) || ((tmpRsGroupId != groupId) && !someServersWithSameGroupId)) { replicationServer = session.getReadableRemoteAddress(); maxSendWindow = replicationServerInfo.getWindowSize(); rsGroupId = replicationServerInfo.getGroupId(); rsServerId = replicationServerInfo.getServerId(); rsServerUrl = replicationServerInfo.getServerURL(); receiveTopo(topologyMsg); // Log a message to let the administrator know that the failure // was resolved. // Wakeup all the thread that were waiting on the window // on the previous connection. connectionError = false; if (sendWindow != null) { sendWindow.release(Integer.MAX_VALUE); } sendWindow = new Semaphore(maxSendWindow); rcvWindow = maxRcvWindow; connected = true; // May have created a broker with null replication domain for // unit test purpose. if (domain != null) { domain.sessionInitiated( initStatus, replicationServerInfo.getServerState(), replicationServerInfo.getGenerationId(), session); } if (getRsGroupId() != groupId) { // Connected to replication server with wrong group id: // warn user and start poller to recover when a server with // right group id arrives... Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get( Byte.toString(groupId), Integer.toString(rsServerId), replicationServerInfo.getServerURL(), Byte.toString(getRsGroupId()), baseDn.toString(), Integer.toString(serverId)); logError(message); } startRSHeartBeatMonitoring(); if (replicationServerInfo.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V3) { startChangeTimeHeartBeatPublishing(); } } else { // Detected new RS with our group id: log disconnection to // inform administrator Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get( Byte.toString(groupId), baseDn.toString(), Integer.toString(serverId)); logError(message); // Do not log connection error newServerWithSameGroupId = true; } } catch (Exception e) { Message message = ERR_COMPUTING_FAKE_OPS.get( baseDn, replicationServerInfo.getServerURL(), e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); } finally { if (connected == false) { if (session != null) { try { session.close(); } catch (IOException e) { // The session was already closed, just ignore. } session = null; } } } } // Could perform handshake phase 2 with best } // Could perform handshake phase 1 with best } // Reached some servers if (connected) { connectPhaseLock.notify(); if ((replicationServerInfo.getGenerationId() == this.getGenerationID()) || (replicationServerInfo.getGenerationId() == -1)) { Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get( baseDn.toString(), Integer.toString(rsServerId), replicationServer, Integer.toString(serverId), Long.toString(this.getGenerationID())); logError(message); } else { Message message = NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get( baseDn.toString(), replicationServer, Long.toString(this.getGenerationID()), Long.toString(replicationServerInfo.getGenerationId())); logError(message); } } else { /* * This server could not find any replicationServer. It's going to start * in degraded mode. Log a message. */ if (!connectionError && !newServerWithSameGroupId) { connectionError = true; connectPhaseLock.notify(); Message message = NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString()); logError(message); } } } } /** * Has the passed RS info list some servers with our group id ? * @return true if at least one server has the same group id */ private boolean hasSomeServerWithSameGroupId(List rsInfos) { for (RSInfo rsInfo : rsInfos) { if (rsInfo.getGroupId() == this.groupId) return true; } return false; } /** * Determines the status we are starting with according to our state and the * RS state. * * @param rsGenId The generation id of the RS * @param rsState The server state of the RS * @param degradedStatusThreshold The degraded status threshold of the RS * @param dsGenId The local generation id * @return The initial status */ public ServerStatus computeInitialServerStatus(long rsGenId, ServerState rsState, int degradedStatusThreshold, long dsGenId) { if (rsGenId == -1) { // RS has no generation id return ServerStatus.NORMAL_STATUS; } else { if (rsGenId == dsGenId) { // DS and RS have same generation id // Determine if we are late or not to replay changes. RS uses a // threshold value for pending changes to be replayed by a DS to // determine if the DS is in normal status or in degraded status. // Let's compare the local and remote server state using this threshold // value to determine if we are late or not ServerStatus initStatus = ServerStatus.INVALID_STATUS; int nChanges = ServerState.diffChanges(rsState, state); if (debugEnabled()) { debugInfo("RB for dn " + baseDn + " and with server id " + Integer.toString(serverId) + " computed " + Integer.toString(nChanges) + " changes late."); } // Check status to know if it is relevant to change the status. Do not // take RSD lock to test. If we attempt to change the status whereas // we are in a status that do not allows that, this will be noticed by // the changeStatusFromStatusAnalyzer method. This allows to take the // lock roughly only when needed versus every sleep time timeout. if (degradedStatusThreshold > 0) { if (nChanges >= degradedStatusThreshold) { initStatus = ServerStatus.DEGRADED_STATUS; } else { initStatus = ServerStatus.NORMAL_STATUS; } } else { // 0 threshold value means no degrading system used (no threshold): // force normal status initStatus = ServerStatus.NORMAL_STATUS; } return initStatus; } else { // DS and RS do not have same generation id return ServerStatus.BAD_GEN_ID_STATUS; } } } /** * Connect to the provided server performing the first phase handshake * (start messages exchange) and return the reply message from the replication * server, wrapped in a ReplicationServerInfo object. * * @param server Server to connect to. * @param keepConnection Do we keep session opened or not after handshake. * Use true if want to perform handshake phase 2 with the same session * and keep the session to create as the current one. * @return The answer from the server . Null if could not * get an answer. */ private ReplicationServerInfo performPhaseOneHandshake(String server, boolean keepConnection) { ReplicationServerInfo replServerInfo = null; // Parse server string. int separator = server.lastIndexOf(':'); String port = server.substring(separator + 1); String hostname = server.substring(0, separator); ProtocolSession localSession = null; boolean error = false; try { /* * Open a socket connection to the next candidate. */ int intPort = Integer.parseInt(port); InetSocketAddress serverAddr = new InetSocketAddress( InetAddress.getByName(hostname), intPort); Socket socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); socket.connect(serverAddr, 500); localSession = replSessionSecurity.createClientSession(server, socket, ReplSessionSecurity.HANDSHAKE_TIMEOUT); boolean isSslEncryption = replSessionSecurity.isSslEncryption(server); /* * Send our ServerStartMsg. */ ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn, maxRcvWindow, heartbeatInterval, state, ProtocolVersion.getCurrentVersion(), this.getGenerationID(), isSslEncryption, groupId); localSession.publish(serverStartMsg); /* * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come * back. */ ReplicationMsg msg = localSession.receive(); if (debugEnabled()) { debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() + "\nAND RECEIVED:\n" + msg.toString()); } // Wrap received message in a server info object replServerInfo = ReplicationServerInfo.newInstance(msg); // Sanity check String repDn = replServerInfo.getBaseDn(); if (!(this.baseDn.equals(repDn))) { Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), this.baseDn); logError(message); error = true; } /* * We have sent our own protocol version to the replication server. * The replication server will use the same one (or an older one * if it is an old replication server). */ protocolVersion = ProtocolVersion.minWithCurrent( replServerInfo.getProtocolVersion()); localSession.setProtocolVersion(protocolVersion); if (!isSslEncryption) { localSession.stopEncryption(); } } catch (ConnectException e) { /* * There was no server waiting on this host:port * Log a notice and try the next replicationServer in the list */ if (!connectionError) { Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); if (keepConnection) // Log error message only for final connection { // the error message is only logged once to avoid overflowing // the error log logError(message); } else if (debugEnabled()) { debugInfo(message.toString()); } } error = true; } catch (Exception e) { if ((e instanceof SocketTimeoutException) && debugEnabled()) { debugInfo("Timeout trying to connect to RS " + server + " for dn: " + baseDn); } Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1", baseDn, server, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); if (keepConnection) // Log error message only for final connection { logError(message); } else if (debugEnabled()) { debugInfo(message.toString()); } error = true; } // Close session if requested if (!keepConnection || error) { if (localSession != null) { if (debugEnabled()) debugInfo("In RB, closing session after phase 1"); if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly end communications if (!error) { try { localSession.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } } try { localSession.close(); } catch (IOException e) { // The session was already closed, just ignore. } localSession = null; } if (error) { replServerInfo = null; } // Be sure to return null. } // If this connection as the one to use for sending and receiving updates, // store it. if (keepConnection) { session = localSession; } return replServerInfo; } /** * Connect to the provided server performing the first phase handshake * (start messages exchange) and return the reply message from the replication * server. * * @param server Server to connect to. * @param keepConnection Do we keep session opened or not after handshake. * Use true if want to perform handshake phase 2 with the same session * and keep the session to create as the current one. * @return The ReplServerStartDSMsg the server replied. Null if could not * get an answer. */ private ReplServerStartDSMsg performECLPhaseOneHandshake(String server, boolean keepConnection) { ReplServerStartDSMsg replServerStartDSMsg = null; // Parse server string. int separator = server.lastIndexOf(':'); String port = server.substring(separator + 1); String hostname = server.substring(0, separator); ProtocolSession localSession = null; boolean error = false; try { /* * Open a socket connection to the next candidate. */ int intPort = Integer.parseInt(port); InetSocketAddress serverAddr = new InetSocketAddress( InetAddress.getByName(hostname), intPort); Socket socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); socket.connect(serverAddr, 500); localSession = replSessionSecurity.createClientSession(server, socket, ReplSessionSecurity.HANDSHAKE_TIMEOUT); boolean isSslEncryption = replSessionSecurity.isSslEncryption(server); // Send our start msg. ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg( baseDn, 0, 0, 0, 0, maxRcvWindow, heartbeatInterval, state, ProtocolVersion.getCurrentVersion(), this.getGenerationID(), isSslEncryption, groupId); localSession.publish(serverStartECLMsg); // Read the ReplServerStartMsg that should come back. replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive(); if (debugEnabled()) { debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() + "\nAND RECEIVED:\n" + replServerStartDSMsg.toString()); } // Sanity check String repDn = replServerStartDSMsg.getBaseDn(); if (!(this.baseDn.equals(repDn))) { Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), this.baseDn); logError(message); error = true; } /* * We have sent our own protocol version to the replication server. * The replication server will use the same one (or an older one * if it is an old replication server). */ if (keepConnection) protocolVersion = ProtocolVersion.minWithCurrent( replServerStartDSMsg.getVersion()); localSession.setProtocolVersion(protocolVersion); if (!isSslEncryption) { localSession.stopEncryption(); } } catch (ConnectException e) { /* * There was no server waiting on this host:port * Log a notice and try the next replicationServer in the list */ if (!connectionError) { Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); if (keepConnection) // Log error message only for final connection { // the error message is only logged once to avoid overflowing // the error log logError(message); } else if (debugEnabled()) { debugInfo(message.toString()); } } error = true; } catch (Exception e) { if ((e instanceof SocketTimeoutException) && debugEnabled()) { debugInfo("Timeout trying to connect to RS " + server + " for dn: " + baseDn); } Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1", baseDn, server, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); if (keepConnection) // Log error message only for final connection { logError(message); } else if (debugEnabled()) { debugInfo(message.toString()); } error = true; } // Close session if requested if (!keepConnection || error) { if (localSession != null) { if (debugEnabled()) debugInfo("In RB, closing session after phase 1"); // V4 protocol introduces a StopMsg to properly end communications if (!error) { try { localSession.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } try { localSession.close(); } catch (IOException e) { // The session was already closed, just ignore. } localSession = null; } if (error) { replServerStartDSMsg = null; } // Be sure to return null. } // If this connection as the one to use for sending and receiving updates, // store it. if (keepConnection) { session = localSession; } return replServerStartDSMsg; } /** * Performs the second phase handshake (send StartSessionMsg and receive * TopologyMsg messages exchange) and return the reply message from the * replication server. * * @param server Server we are connecting with. * @param initStatus The status we are starting with * @return The ReplServerStartMsg the server replied. Null if could not * get an answer. */ private TopologyMsg performECLPhaseTwoHandshake(String server) { TopologyMsg topologyMsg = null; try { // Send our Start Session StartECLSessionMsg startECLSessionMsg = null; startECLSessionMsg = new StartECLSessionMsg(); startECLSessionMsg.setOperationId("-1"); session.publish(startECLSessionMsg); /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ? * Read the TopologyMsg that should come back. topologyMsg = (TopologyMsg) session.receive(); */ if (debugEnabled()) { debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString()); // + "\nAND RECEIVED:\n" + topologyMsg.toString()); } // Alright set the timeout to the desired value session.setSoTimeout(timeout); connected = true; } catch (Exception e) { Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2", baseDn, server, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); if (session != null) { try { session.close(); } catch (IOException ex) { // The session was already closed, just ignore. } session = null; } // Be sure to return null. topologyMsg = null; } return topologyMsg; } /** * Performs the second phase handshake (send StartSessionMsg and receive * TopologyMsg messages exchange) and return the reply message from the * replication server. * * @param server Server we are connecting with. * @param initStatus The status we are starting with * @return The ReplServerStartMsg the server replied. Null if could not * get an answer. */ private TopologyMsg performPhaseTwoHandshake(String server, ServerStatus initStatus) { TopologyMsg topologyMsg = null; try { /* * Send our StartSessionMsg. */ StartSessionMsg startSessionMsg = null; // May have created a broker with null replication domain for // unit test purpose. if (domain != null) { startSessionMsg = new StartSessionMsg( initStatus, domain.getRefUrls(), domain.isAssured(), domain.getAssuredMode(), domain.getAssuredSdLevel()); startSessionMsg.setEclIncludes( domain.getEclInclude(domain.getServerId())); } else { startSessionMsg = new StartSessionMsg(initStatus, new ArrayList()); } session.publish(startSessionMsg); /* * Read the TopologyMsg that should come back. */ topologyMsg = (TopologyMsg) session.receive(); if (debugEnabled()) { debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" + startSessionMsg.toString() + "\nAND RECEIVED:\n" + topologyMsg.toString()); } // Alright set the timeout to the desired value session.setSoTimeout(timeout); } catch (Exception e) { Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2", baseDn, server, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); if (session != null) { try { session.close(); } catch (IOException ex) { // The session was already closed, just ignore. } session = null; } // Be sure to return null. topologyMsg = null; } return topologyMsg; } /** * Returns the replication server that best fits our need so that we can * connect to it or determine if we must disconnect from current one to * re-connect to best server. * * Note: this method is static for test purpose (access from unit tests) * * @param firstConnection True if we run this method for the very first * connection of the broker. False if we run this method to determine if the * replication server we are currently connected to is still the best or not. * @param rsServerId The id of the replication server we are currently * connected to. Only used when firstConnection is false. * @param myState The local server state. * @param rsInfos The list of available replication servers and their * associated information (choice will be made among them). * @param localServerId The server id for the suffix we are working for. * @param baseDn The suffix for which we are working for. * @param groupId The groupId we prefer being connected to if possible * @param generationId The generation id we are using * @return The computed best replication server. If the returned value is * null, the best replication server is undetermined but the local server must * disconnect (so the best replication server is another one than the current * one). Null can only be returned when firstConnection is false. */ public static ReplicationServerInfo computeBestReplicationServer( boolean firstConnection, int rsServerId, ServerState myState, Map rsInfos, int localServerId, String baseDn, byte groupId, long generationId) { // Shortcut, if only one server, this is the best if (rsInfos.size() == 1) { return rsInfos.values().iterator().next(); } /** * Apply some filtering criteria to determine the best servers list from * the available ones. The ordered list of criteria is (from more important * to less important): * - replication server has the same group id as the local DS one * - replication server has the same generation id as the local DS one * - replication server is up to date regarding changes generated by the * local DS * - replication server in the same VM as local DS one */ Map bestServers = rsInfos; Map newBestServers; // The list of best replication servers is filtered with each criteria. At // each criteria, the list is replaced with the filtered one if some there // are some servers from the filtering, otherwise, the list is left as is // and the new filtering for the next criteria is applied and so on. for (int filterLevel = 1; filterLevel <= 4; filterLevel++) { newBestServers = null; switch (filterLevel) { case 1: // Use only servers locally configured: those are servers declared in // the local configuration. When the current method is called, for // sure, at least one server from the list is locally configured bestServers = filterServersLocallyConfigured(bestServers); break; case 2: // Some servers with same group id ? newBestServers = filterServersWithSameGroupId(bestServers, groupId); if (newBestServers.size() > 0) { bestServers = newBestServers; } break; case 3: // Some servers with same generation id ? newBestServers = filterServersWithSameGenerationId(bestServers, generationId); if (newBestServers.size() > 0) { // Ok some servers with the right generation id bestServers = newBestServers; // If some servers with the right generation id this is useful to // run the local DS change criteria newBestServers = filterServersWithAllLocalDSChanges(bestServers, myState, localServerId); if (newBestServers.size() > 0) { bestServers = newBestServers; } } break; case 4: // Some servers in the local VM ? newBestServers = filterServersInSameVM(bestServers); if (newBestServers.size() > 0) { bestServers = newBestServers; } break; } } /** * Now apply the choice base on the weight to the best servers list */ if (bestServers.size() > 1) { if (firstConnection) { // We are no connected to a server yet return computeBestServerForWeight(bestServers, -1, -1); } else { // We are already connected to a RS: compute the best RS as far as the // weights is concerned. If this is another one, some DS must // disconnect. return computeBestServerForWeight(bestServers, rsServerId, localServerId); } } else { return bestServers.values().iterator().next(); } } /** * Creates a new list that contains only replication servers that are locally * configured. * @param bestServers The list of replication servers to filter * @return The sub list of replication servers locally configured */ private static Map filterServersLocallyConfigured(Map bestServers) { Map result = new HashMap(); for (Integer rsId : bestServers.keySet()) { ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); if (replicationServerInfo.isLocallyConfigured()) { result.put(rsId, replicationServerInfo); } } return result; } /** * Creates a new list that contains only replication servers that have the * passed group id, from a passed replication server list. * @param bestServers The list of replication servers to filter * @param groupId The group id that must match * @return The sub list of replication servers matching the requested group id * (which may be empty) */ private static Map filterServersWithSameGroupId(Map bestServers, byte groupId) { Map result = new HashMap(); for (Integer rsId : bestServers.keySet()) { ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); if (replicationServerInfo.getGroupId() == groupId) { result.put(rsId, replicationServerInfo); } } return result; } /** * Creates a new list that contains only replication servers that have the * provided generation id, from a provided replication server list. * When the selected replication servers have no change (empty serverState) * then the 'empty'(generationId==-1) replication servers are also included * in the result list. * * @param bestServers The list of replication servers to filter * @param generationId The generation id that must match * @return The sub list of replication servers matching the requested * generation id (which may be empty) */ private static Map filterServersWithSameGenerationId(Map bestServers, long generationId) { Map result = new HashMap(); boolean emptyState = true; for (Integer rsId : bestServers.keySet()) { ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); if (replicationServerInfo.getGenerationId() == generationId) { result.put(rsId, replicationServerInfo); if (!replicationServerInfo.serverState.isEmpty()) emptyState = false; } } if (emptyState) { // If the RS with a generationId have all an empty state, // then the 'empty'(genId=-1) RSes are also candidate for (Integer rsId : bestServers.keySet()) { ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); if (replicationServerInfo.getGenerationId() == -1) { result.put(rsId, replicationServerInfo); } } } return result; } /** * Creates a new list that contains only replication servers that have the * latest changes from the passed DS, from a passed replication server list. * @param bestServers The list of replication servers to filter * @param localState The state of the local DS * @param localServerId The server id to consider for the changes * @return The sub list of replication servers that have the latest changes * from the passed DS (which may be empty) */ private static Map filterServersWithAllLocalDSChanges(Map bestServers, ServerState localState, int localServerId) { Map upToDateServers = new HashMap(); Map moreUpToDateServers = new HashMap(); // Extract the change number of the latest change generated by the local // server ChangeNumber myChangeNumber = localState.getMaxChangeNumber(localServerId); if (myChangeNumber == null) { myChangeNumber = new ChangeNumber(0, 0, localServerId); } /** * Find replication servers who are up to date (or more up to date than us, * if for instance we failed and restarted, having sent some changes to the * RS but without having time to store our own state) regarding our own * server id. If some servers more up to date, prefer this list but take * only the latest change number. */ ChangeNumber latestRsChangeNumber = null; for (Integer rsId : bestServers.keySet()) { ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); ServerState rsState = replicationServerInfo.getServerState(); ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerId); if (rsChangeNumber == null) { rsChangeNumber = new ChangeNumber(0, 0, localServerId); } // Has this replication server the latest local change ? if (myChangeNumber.olderOrEqual(rsChangeNumber)) { if (myChangeNumber.equals(rsChangeNumber)) { // This replication server has exactly the latest change from the // local server upToDateServers.put(rsId, replicationServerInfo); } else { // This replication server is even more up to date than the local // server if (latestRsChangeNumber == null) { // Initialize the latest change number latestRsChangeNumber = rsChangeNumber; } if (rsChangeNumber.newerOrEquals(latestRsChangeNumber)) { if (rsChangeNumber.equals(latestRsChangeNumber)) { moreUpToDateServers.put(rsId, replicationServerInfo); } else { // This RS is even more up to date, clear the list and store this // new RS moreUpToDateServers.clear(); moreUpToDateServers.put(rsId, replicationServerInfo); latestRsChangeNumber = rsChangeNumber; } } } } } if (moreUpToDateServers.size() > 0) { // Prefer servers more up to date than local server return moreUpToDateServers; } else { return upToDateServers; } } /** * Creates a new list that contains only replication servers that are in the * same VM as the local DS, from a passed replication server list. * @param bestServers The list of replication servers to filter * @return The sub list of replication servers being in the same VM as the * local DS (which may be empty) */ private static Map filterServersInSameVM( Map bestServers) { Map result = new HashMap(); for (Integer rsId : bestServers.keySet()) { ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); if (ReplicationServer.isLocalReplicationServer( replicationServerInfo.getServerURL())) { result.put(rsId, replicationServerInfo); } } return result; } /** * Computes the best replication server the local server should be connected * to so that the load is correctly spread across the topology, following the * weights guidance. * Warning: This method is expected to be called with at least 2 servers in * bestServers * Note: this method is static for test purpose (access from unit tests) * @param bestServers The list of replication servers to consider * @param currentRsServerId The replication server the local server is * currently connected to. -1 if the local server is not yet connected * to any replication server. * @param localServerId The server id of the local server. This is not used * when it is not connected to a replication server * (currentRsServerId = -1) * @return The replication server the local server should be connected to * as far as the weight is concerned. This may be the currently used one if * the weight is correctly spread. If the returned value is null, the best * replication server is undetermined but the local server must disconnect * (so the best replication server is another one than the current one). */ public static ReplicationServerInfo computeBestServerForWeight( Map bestServers, int currentRsServerId, int localServerId) { /* * - Compute the load goal of each RS, deducing it from the weights affected * to them. * - Compute the current load of each RS, deducing it from the DSs * currently connected to them. * - Compute the differences between the load goals and the current loads of * the RSs. */ // Sum of the weights int sumOfWeights = 0; // Sum of the connected DSs int sumOfConnectedDSs = 0; for (ReplicationServerInfo replicationServerInfo : bestServers.values()) { sumOfWeights += replicationServerInfo.getWeight(); sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber(); } // Distance (difference) of the current loads to the load goals of each RS: // key:server id, value: distance Map loadDistances = new HashMap(); // Precision for the operations (number of digits after the dot) // Default value of rounding method is HALF_UP for // the MathContext MathContext mathContext = new MathContext(32); for (Integer rsId : bestServers.keySet()) { ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); int rsWeight = replicationServerInfo.getWeight(); // load goal = rs weight / sum of weights BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide( new BigDecimal(sumOfWeights), mathContext); BigDecimal currentLoadBd = BigDecimal.ZERO; if (sumOfConnectedDSs != 0) { // current load = number of connected DSs / total number of DSs int connectedDSs = replicationServerInfo.getConnectedDSNumber(); currentLoadBd = (new BigDecimal(connectedDSs)).divide( new BigDecimal(sumOfConnectedDSs), mathContext); } // load distance = load goal - current load BigDecimal loadDistanceBd = loadGoalBd.subtract(currentLoadBd, mathContext); loadDistances.put(rsId, loadDistanceBd); } if (currentRsServerId == -1) { // The local server is not connected yet /* * Find the server with the current highest distance to its load goal and * choose it. Make an exception if every server is correctly balanced, * that is every current load distances are equal to 0, in that case, * choose the server with the highest weight */ int bestRsId = 0; // If all server equal, return the first one float highestDistance = Float.NEGATIVE_INFINITY; boolean allRsWithZeroDistance = true; int highestWeightRsId = -1; int highestWeight = -1; for (Integer rsId : bestServers.keySet()) { float loadDistance = loadDistances.get(rsId).floatValue(); if (loadDistance > highestDistance) { // This server is far more from its balance point bestRsId = rsId; highestDistance = loadDistance; } if (loadDistance != (float)0) { allRsWithZeroDistance = false; } int weight = bestServers.get(rsId).getWeight(); if (weight > highestWeight) { // This server has a higher weight highestWeightRsId = rsId; highestWeight = weight; } } // All servers with a 0 distance ? if (allRsWithZeroDistance) { // Choose server withe the highest weight bestRsId = highestWeightRsId; } return bestServers.get(bestRsId); } else { // The local server is currently connected to a RS, let's see if it must // disconnect or not, taking the weights into account. float currentLoadDistance = loadDistances.get(currentRsServerId).floatValue(); if (currentLoadDistance < (float) 0) { // Too much DSs connected to the current RS, compared with its load // goal: // Determine the potential number of DSs to disconnect from the current // RS and see if the local DS is part of them: the DSs that must // disconnect are those with the lowest server id. // Compute the sum of the distances of the load goals of the other RSs BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO; for (Integer rsId : bestServers.keySet()) { if (rsId != currentRsServerId) { sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add( loadDistances.get(rsId), mathContext); } } if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0) { // The average distance of the other RSs shows a lack of DSs. // Compute the number of DSs to disconnect from the current RS, // rounding to the nearest integer number. Do only this if there is // no risk of yoyo effect: when the exact balance cannot be // established due to the current number of DSs connected, do not // disconnect a DS. A simple example where the balance cannot be // reached is: // - RS1 has weight 1 and 2 DSs // - RS2 has weight 1 and 1 DS // => disconnecting a DS from RS1 to reconnect it to RS2 would have no // sense as this would lead to the reverse situation. In that case, // the perfect balance cannot be reached and we must stick to the // current situation, otherwise the DS would keep move between the 2 // RSs float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd. multiply(new BigDecimal(sumOfConnectedDSs), mathContext). floatValue(); int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber); // Avoid yoyo effect if (overloadingDSsNumber == 1) { // What would be the new load distance for the current RS if // we disconnect some DSs ? ReplicationServerInfo currentReplicationServerInfo = bestServers.get(currentRsServerId); int currentRsWeight = currentReplicationServerInfo.getWeight(); BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight); BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights); BigDecimal currentRsLoadGoalBd = currentRsWeightBd.divide(sumOfWeightsBd, mathContext); BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0); if (sumOfConnectedDSs != 0) { int connectedDSs = currentReplicationServerInfo. getConnectedDSNumber(); BigDecimal potentialNewConnectedDSsBd = new BigDecimal(connectedDSs - 1); BigDecimal sumOfConnectedDSsBd = new BigDecimal(sumOfConnectedDSs); potentialCurrentRsNewLoadBd = potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd, mathContext); } BigDecimal potentialCurrentRsNewLoadDistanceBd = currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd, mathContext); // What would be the new load distance for the other RSs ? BigDecimal additionalDsLoadBd = (new BigDecimal(1)).divide( new BigDecimal(sumOfConnectedDSs), mathContext); BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd, mathContext); // Now compare both values: we must no disconnect the DS if this // is for going in a situation where the load distance of the other // RSs is the opposite of the future load distance of the local RS // or we would evaluate that we should disconnect just after being // arrived on the new RS. But we should disconnect if we reach the // perfect balance (both values are 0). MathContext roundMc = new MathContext(6, RoundingMode.DOWN); BigDecimal potentialCurrentRsNewLoadDistanceBdRounded = potentialCurrentRsNewLoadDistanceBd.round(roundMc); BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded = potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc); if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo( BigDecimal.ZERO) != 0) && (potentialCurrentRsNewLoadDistanceBdRounded.equals( potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate()))) { // Avoid the yoyo effect, and keep the local DS connected to its // current RS return bestServers.get(currentRsServerId); } } // Prepare a sorted list (from lowest to highest) or DS server ids // connected to the current RS ReplicationServerInfo currentRsInfo = bestServers.get(currentRsServerId); List serversConnectedToCurrentRS = currentRsInfo.getConnectedDSs(); List sortedServers = new ArrayList( serversConnectedToCurrentRS); Collections.sort(sortedServers); // Go through the list of DSs to disconnect and see if the local // server is part of them. int index = 0; while (overloadingDSsNumber > 0) { int severToDisconnectId = sortedServers.get(index); if (severToDisconnectId == localServerId) { // The local server is part of the DSs to disconnect return null; } overloadingDSsNumber--; index++; } // The local server is not part of the servers to disconnect from the // current RS. return bestServers.get(currentRsServerId); } else { // The average distance of the other RSs does not show a lack of DSs: // no need to disconnect any DS from the current RS. return bestServers.get(currentRsServerId); } } else { // The RS load goal is reached or there are not enough DSs connected to // it to reach it: do not disconnect from this RS and return rsInfo for // this RS return bestServers.get(currentRsServerId); } } } /** * Start the heartbeat monitor thread. */ private void startRSHeartBeatMonitoring() { // Start a heartbeat monitor thread. if (heartbeatInterval > 0) { heartbeatMonitor = new HeartbeatMonitor("Replication Heartbeat Monitor on RS " + getReplicationServer() + " " + rsServerId + " for " + baseDn + " in DS " + serverId, session, heartbeatInterval, (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)); heartbeatMonitor.start(); } } /** * Stop the heartbeat monitor thread. */ synchronized void stopRSHeartBeatMonitoring() { if (heartbeatMonitor != null) { heartbeatMonitor.shutdown(); heartbeatMonitor = null; } } /** * restart the ReplicationBroker. * @param infiniteTry the socket which failed */ public void reStart(boolean infiniteTry) { reStart(this.session, infiniteTry); } /** * Restart the ReplicationServer broker after a failure. * * @param failingSession the socket which failed * @param infiniteTry the socket which failed */ public void reStart(ProtocolSession failingSession, boolean infiniteTry) { if (failingSession != null) { if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly end communications try { failingSession.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } try { failingSession.close(); } catch (IOException e1) { // ignore } numLostConnections++; } if (failingSession == session) { this.connected = false; rsGroupId = (byte) -1; rsServerId = -1; rsServerUrl = null; session = null; } while (!this.connected && (!this.shutdown)) { try { this.connect(); } catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( baseDn, e.getLocalizedMessage())); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); } if ((!connected) && (!infiniteTry)) break; if ((!connected) && (!shutdown)) { try { Thread.sleep(500); } catch (InterruptedException e) { // ignore } } } if (debugEnabled()) debugInfo(this + " end restart : connected=" + connected + " with RSid=" + this.getRsServerId() + " genid=" + this.generationID); } /** * Publish a message to the other servers. * @param msg the message to publish */ public void publish(ReplicationMsg msg) { _publish(msg, false, true); } /** * Publish a message to the other servers. * @param msg The message to publish. * @param retryOnFailure Whether reconnect should automatically be done. * @return Whether publish succeeded. */ public boolean publish(ReplicationMsg msg, boolean retryOnFailure) { return _publish(msg, false, retryOnFailure); } /** * Publish a recovery message to the other servers. * @param msg the message to publish */ public void publishRecovery(ReplicationMsg msg) { _publish(msg, true, true); } /** * Publish a message to the other servers. * @param msg the message to publish * @param recoveryMsg the message is a recovery Message * @param retryOnFailure whether retry should be done on failure * @return whether the message was successfully sent. */ boolean _publish(ReplicationMsg msg, boolean recoveryMsg, boolean retryOnFailure) { boolean done = false; while (!done && !shutdown) { if (connectionError) { // It was not possible to connect to any replication server. // Since the operation was already processed, we have no other // choice than to return without sending the ReplicationMsg // and relying on the resend procedure of the connect phase to // fix the problem when we finally connect. if (debugEnabled()) { debugInfo("ReplicationBroker.publish() Publishing a " + "message is not possible due to existing connection error."); } return false; } try { boolean credit; ProtocolSession current_session; Semaphore currentWindowSemaphore; // save the session at the time when we acquire the // sendwindow credit so that we can make sure later // that the session did not change in between. // This is necessary to make sure that we don't publish a message // on a session with a credit that was acquired from a previous // session. synchronized (connectPhaseLock) { current_session = session; currentWindowSemaphore = sendWindow; } // If the Replication domain has decided that there is a need to // recover some changes then it is not allowed to send this // change but it will be the responsibility of the recovery thread to // do it. if (!recoveryMsg & connectRequiresRecovery) { return false; } if (msg instanceof UpdateMsg) { // Acquiring the window credit must be done outside of the // connectPhaseLock because it can be blocking and we don't // want to hold off reconnection in case the connection dropped. credit = currentWindowSemaphore.tryAcquire( (long) 500, TimeUnit.MILLISECONDS); } else { credit = true; } if (credit) { synchronized (connectPhaseLock) { // session may have been set to null in the connection phase // when restarting the broker for example. // check the session. If it has changed, some // deconnection/reconnection happened and we need to restart from // scratch. if ((session != null) && (session == current_session)) { session.publish(msg); done = true; } } } if ((!credit) && (currentWindowSemaphore.availablePermits() == 0)) { // the window is still closed. // Send a WindowProbeMsg message to wakeup the receiver in case the // window update message was lost somehow... // then loop to check again if connection was closed. session.publish(new WindowProbeMsg()); } } catch (IOException e) { if (!retryOnFailure) return false; // The receive threads should handle reconnection or // mark this broker in error. Just retry. synchronized (connectPhaseLock) { try { connectPhaseLock.wait(100); } catch (InterruptedException e1) { // ignore if (debugEnabled()) { debugInfo("ReplicationBroker.publish() " + "Interrupted exception raised : " + e.getLocalizedMessage()); } } } } catch (InterruptedException e) { // just loop. if (debugEnabled()) { debugInfo("ReplicationBroker.publish() " + "Interrupted exception raised." + e.getLocalizedMessage()); } } } return true; } /** * Receive a message. * This method is not multithread safe and should either always be * called in a single thread or protected by a locking mechanism * before being called. This is a wrapper to the method with a boolean version * so that we do not have to modify existing tests. * * @return the received message * @throws SocketTimeoutException if the timeout set by setSoTimeout * has expired */ public ReplicationMsg receive() throws SocketTimeoutException { return receive(false, true, false); } /** * Receive a message. * This method is not multithread safe and should either always be * called in a single thread or protected by a locking mechanism * before being called. * * @throws SocketTimeoutException if the timeout set by setSoTimeout * has expired * @param reconnectToTheBestRS Whether broker will automatically switch * to the best suitable RS. * @param reconnectOnFailure Whether broker will automatically reconnect * on failure. * @param returnOnTopoChange Whether broker should return TopologyMsg * received. * @return the received message * * @throws SocketTimeoutException if the timeout set by setSoTimeout * has expired */ public ReplicationMsg receive(boolean reconnectToTheBestRS, boolean reconnectOnFailure, boolean returnOnTopoChange) throws SocketTimeoutException { while (shutdown == false) { if ((reconnectOnFailure) && (!connected)) { // infinite try to reconnect reStart(null, true); } ProtocolSession failingSession = session; try { ReplicationMsg msg = session.receive(); if (msg instanceof UpdateMsg) { synchronized (this) { rcvWindow--; } } if (msg instanceof WindowMsg) { WindowMsg windowMsg = (WindowMsg) msg; sendWindow.release(windowMsg.getNumAck()); } else if (msg instanceof TopologyMsg) { TopologyMsg topoMsg = (TopologyMsg) msg; receiveTopo(topoMsg); if (reconnectToTheBestRS) { // Reset wait time before next computation of best server mustRunBestServerCheckingAlgorithm = 0; } // Caller wants to check what's changed if (returnOnTopoChange) return msg; } else if (msg instanceof StopMsg) { /* * RS performs a proper disconnection */ Message message = NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer, Integer.toString(rsServerId), baseDn.toString(), Integer.toString(serverId)); logError(message); // Try to find a suitable RS this.reStart(failingSession, true); } else if (msg instanceof MonitorMsg) { // This is the response to a MonitorRequest that was sent earlier or // the regular message of the monitoring publisher of the RS. // Extract and store replicas ServerStates replicaStates = new HashMap(); MonitorMsg monitorMsg = (MonitorMsg) msg; Iterator it = monitorMsg.ldapIterator(); while (it.hasNext()) { int srvId = it.next(); replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId)); } // Notify the sender that the response was received. synchronized (monitorResponse) { monitorResponse.set(true); monitorResponse.notify(); } // Update the replication servers ServerStates with new received info it = monitorMsg.rsIterator(); while (it.hasNext()) { int srvId = it.next(); ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId); if (rsInfo != null) { rsInfo.update(monitorMsg.getRSServerState(srvId)); } } // Now if it is allowed, compute the best replication server to see if // it is still the one we are currently connected to. If not, // disconnect properly and let the connection algorithm re-connect to // best replication server if (reconnectToTheBestRS) { mustRunBestServerCheckingAlgorithm++; if (mustRunBestServerCheckingAlgorithm == 2) { // Stable topology (no topo msg since few seconds): proceed with // best server checking. ReplicationServerInfo bestServerInfo = computeBestReplicationServer(false, rsServerId, state, replicationServerInfos, serverId, baseDn, groupId, generationID); if ((rsServerId != -1) && ((bestServerInfo == null) || (bestServerInfo.getServerId() != rsServerId))) { // The best replication server is no more the one we are // currently using. Disconnect properly then reconnect. Message message = NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(), Integer.toString(serverId), Integer.toString(rsServerId), rsServerUrl, Integer.toString(bestServerInfo.getServerId())); logError(message); reStart(true); } // Reset wait time before next computation of best server mustRunBestServerCheckingAlgorithm = 0; } } } else { return msg; } } catch (SocketTimeoutException e) { throw e; } catch (Exception e) { if (shutdown == false) { if ((session == null) || (!session.closeInitiated())) { /* * We did not initiate the close on our side, log an error message. */ Message message = ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer, Integer.toString(rsServerId), baseDn.toString(), Integer.toString(serverId)); logError(message); } if (reconnectOnFailure) reStart(failingSession, true); else break; // does not seem necessary to explicitely disconnect .. } } } // while !shutdown return null; } /** * Gets the States of all the Replicas currently in the * Topology. * When this method is called, a Monitoring message will be sent * to the Replication Server to which this domain is currently connected * so that it computes a table containing information about * all Directory Servers in the topology. * This Computation involves communications will all the servers * currently connected and * * @return The States of all Replicas in the topology (except us) */ public Map getReplicaStates() { monitorResponse.set(false); // publish Monitor Request Message to the Replication Server publish(new MonitorRequestMsg(serverId, getRsServerId())); // wait for Response up to 10 seconds. try { synchronized (monitorResponse) { if (monitorResponse.get() == false) { monitorResponse.wait(10000); } } } catch (InterruptedException e) { } return replicaStates; } /** * This method allows to do the necessary computing for the window * management after treatment by the worker threads. * * This should be called once the replay thread have done their job * and the window can be open again. */ public synchronized void updateWindowAfterReplay() { try { updateDoneCount++; if ((updateDoneCount >= halfRcvWindow) && (session != null)) { session.publish(new WindowMsg(updateDoneCount)); rcvWindow += updateDoneCount; updateDoneCount = 0; } } catch (IOException e) { // Any error on the socket will be handled by the thread calling receive() // just ignore. } } /** * stop the server. */ public void stop() { if (debugEnabled()) debugInfo("ReplicationBroker " + serverId + " is stopping and will" + " close the connection to replication server " + rsServerId + " for" + " domain " + baseDn); stopRSHeartBeatMonitoring(); stopChangeTimeHeartBeatPublishing(); replicationServer = "stopped"; shutdown = true; connected = false; rsGroupId = (byte) -1; rsServerId = -1; rsServerUrl = null; try { if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly end communications session.publish(new StopMsg()); } session.close(); } catch (Exception e) { // Anyway, going to close session, so nothing to do } } /** * Set a timeout value. * With this option set to a non-zero value, calls to the receive() method * block for only this amount of time after which a * java.net.SocketTimeoutException is raised. * The Broker is valid and usable even after such an Exception is raised. * * @param timeout the specified timeout, in milliseconds. * @throws SocketException if there is an error in the underlying protocol, * such as a TCP error. */ public void setSoTimeout(int timeout) throws SocketException { this.timeout = timeout; if (session != null) { session.setSoTimeout(timeout); } } /** * Get the name of the replicationServer to which this broker is currently * connected. * * @return the name of the replicationServer to which this domain * is currently connected. */ public String getReplicationServer() { return replicationServer; } /** * Get the maximum receive window size. * * @return The maximum receive window size. */ public int getMaxRcvWindow() { return maxRcvWindow; } /** * Get the current receive window size. * * @return The current receive window size. */ public int getCurrentRcvWindow() { return rcvWindow; } /** * Get the maximum send window size. * * @return The maximum send window size. */ public int getMaxSendWindow() { return maxSendWindow; } /** * Get the current send window size. * * @return The current send window size. */ public int getCurrentSendWindow() { if (connected) { return sendWindow.availablePermits(); } else { return 0; } } /** * Get the number of times the connection was lost. * @return The number of times the connection was lost. */ public int getNumLostConnections() { return numLostConnections; } /** * Change some configuration parameters. * * @param replicationServers The new list of replication servers. * @param window The max window size. * @param heartbeatInterval The heartBeat interval. * * @return A boolean indicating if the changes * requires to restart the service. * @param groupId The new group id to use */ public boolean changeConfig( Collection replicationServers, int window, long heartbeatInterval, byte groupId) { // These parameters needs to be renegotiated with the ReplicationServer // so if they have changed, that requires restarting the session with // the ReplicationServer. Boolean needToRestartSession = false; // A new session is necessary only when information regarding // the connection is modified if ((servers == null) || (!(replicationServers.size() == servers.size() && replicationServers. containsAll(servers))) || window != this.maxRcvWindow || heartbeatInterval != this.heartbeatInterval || (groupId != this.groupId)) { needToRestartSession = true; } this.servers = replicationServers; this.rcvWindow = window; this.maxRcvWindow = window; this.halfRcvWindow = window / 2; this.heartbeatInterval = heartbeatInterval; this.groupId = groupId; return needToRestartSession; } /** * Get the version of the replication protocol. * @return The version of the replication protocol. */ public short getProtocolVersion() { return protocolVersion; } /** * Check if the broker is connected to a ReplicationServer and therefore * ready to received and send Replication Messages. * * @return true if the server is connected, false if not. */ public boolean isConnected() { return connected; } private boolean debugEnabled() { return false; } private static final void debugInfo(String s) { logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); TRACER.debugInfo(s); } /** * Determine whether the connection to the replication server is encrypted. * @return true if the connection is encrypted, false otherwise. */ public boolean isSessionEncrypted() { boolean isEncrypted = false; if (session != null) { return session.isEncrypted(); } return isEncrypted; } /** * Signals the RS we just entered a new status. * @param newStatus The status the local DS just entered */ public void signalStatusChange(ServerStatus newStatus) { try { ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus); session.publish(csMsg); } catch (IOException ex) { Message message = ERR_EXCEPTION_SENDING_CS.get( baseDn, Integer.toString(serverId), ex.getLocalizedMessage() + stackTraceToSingleLineString(ex)); logError(message); } } /** * Sets the group id of the broker. * @param groupId The new group id. */ public void setGroupId(byte groupId) { this.groupId = groupId; } /** * Gets the info for DSs in the topology (except us). * @return The info for DSs in the topology (except us) */ public List getDsList() { return dsList; } /** * Gets the info for RSs in the topology (except the one we are connected * to). * @return The info for RSs in the topology (except the one we are connected * to) */ public List getRsList() { List result = new ArrayList(); for (ReplicationServerInfo replicationServerInfo : replicationServerInfos.values()) { result.add(replicationServerInfo.toRSInfo()); } return result; } /** * Computes the list of DSs connected to a particular RS. * @param rsId The RS id of the server one wants to know the connected DSs * @param dsList The list of DSinfo from which to compute things * @return The list of connected DSs to the server rsId */ private List computeConnectedDSs(int rsId, List dsList) { List connectedDSs = new ArrayList(); if (rsServerId == rsId) { // If we are computing connected DSs for the RS we are connected // to, we should count the local DS as the DSInfo of the local DS is not // sent by the replication server in the topology message. We must count // ourself as a connected server. connectedDSs.add(serverId); } for (DSInfo dsInfo : dsList) { if (dsInfo.getRsId() == rsId) connectedDSs.add(dsInfo.getDsId()); } return connectedDSs; } /** * Processes an incoming TopologyMsg. * Updates the structures for the local view of the topology. * * @param topoMsg The topology information received from RS. */ public void receiveTopo(TopologyMsg topoMsg) { if (debugEnabled()) debugInfo(this + " receive TopologyMsg=" + topoMsg); // Store new DS list dsList = topoMsg.getDsList(); // Update replication server info list with the received topology // information List rsToKeepList = new ArrayList(); for (RSInfo rsInfo : topoMsg.getRsList()) { int rsId = rsInfo.getId(); rsToKeepList.add(rsId); // Mark this server as still existing List connectedDSs = computeConnectedDSs(rsId, dsList); ReplicationServerInfo replicationServerInfo = replicationServerInfos.get(rsId); if (replicationServerInfo == null) { // New replication server, create info for it add it to the list replicationServerInfo = new ReplicationServerInfo(rsInfo, connectedDSs); // Set the locally configured flag for this new RS only if it is // configured updateRSInfoLocallyConfiguredStatus(replicationServerInfo); replicationServerInfos.put(rsId, replicationServerInfo); } else { // Update the existing info for the replication server replicationServerInfo.update(rsInfo, connectedDSs); } } /** * Now remove any replication server that may have disappeared from the * topology. */ Iterator> rsInfoIt = replicationServerInfos.entrySet().iterator(); while (rsInfoIt.hasNext()) { Entry rsInfoEntry = rsInfoIt.next(); if (!rsToKeepList.contains(rsInfoEntry.getKey())) { // This replication server has quit the topology, remove it from the // list rsInfoIt.remove(); } } if (domain != null) { for (DSInfo info : dsList) domain.setEclInclude(info.getDsId(), info.getEclIncludes()); } } /** * Check if the broker could not find any Replication Server and therefore * connection attempt failed. * * @return true if the server could not connect to any Replication Server. */ public boolean hasConnectionError() { return connectionError; } /** * Starts publishing to the RS the current timestamp used in this server. */ public void startChangeTimeHeartBeatPublishing() { // Start a CN heartbeat thread. if (changeTimeHeartbeatSendInterval > 0) { ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( "Replication CN Heartbeat sender for " + baseDn + " with " + getReplicationServer(), session, changeTimeHeartbeatSendInterval, serverId); ctHeartbeatPublisherThread.start(); } else { if (debugEnabled()) debugInfo(this + " is not configured to send CN heartbeat interval"); } } /** * Stops publishing to the RS the current timestamp used in this server. */ public synchronized void stopChangeTimeHeartBeatPublishing() { if (ctHeartbeatPublisherThread != null) { ctHeartbeatPublisherThread.shutdown(); ctHeartbeatPublisherThread = null; } } /** * Set a new change time heartbeat interval to this broker. * @param changeTimeHeartbeatInterval The new interval (in ms). */ public void setChangeTimeHeartbeatInterval(int changeTimeHeartbeatInterval) { stopChangeTimeHeartBeatPublishing(); this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval; startChangeTimeHeartBeatPublishing(); } /** * Set the connectRequiresRecovery to the provided value. * This flag is used to indicate if a recovery of Update is necessary * after a reconnection to a RS. * It is the responsibility of the ReplicationDomain to set it during the * sessionInitiated phase. * * @param b the new value of the connectRequiresRecovery. */ public void setRecoveryRequired(boolean b) { connectRequiresRecovery = b; } /** * Returns whether the broker is shutting down. * @return whether the broker is shutting down. */ public boolean shuttingDown() { return shutdown; } }