| | |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.*; |
| | |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | private volatile boolean shutdown = false; |
| | | private final Object startStopLock = new Object(); |
| | | /** |
| | | * Replication server URLs under this format: "<code>hostname:port</code>". |
| | | */ |
| | | private volatile Set<String> replicationServerUrls; |
| | | private volatile ReplicationDomainCfg config; |
| | | private volatile boolean connected = false; |
| | | /** |
| | | * String reported under CSN=monitor when there is no connected RS. |
| | |
| | | private volatile String replicationServer = NO_CONNECTED_SERVER; |
| | | private volatile Session session; |
| | | private final ServerState state; |
| | | private final DN baseDN; |
| | | private final int serverId; |
| | | private Semaphore sendWindow; |
| | | private int maxSendWindow; |
| | | private int rcvWindow = 100; |
| | | private int halfRcvWindow = rcvWindow / 2; |
| | | private int maxRcvWindow = rcvWindow; |
| | | private int timeout = 0; |
| | | private short protocolVersion; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | /** My group id. */ |
| | | private byte groupId = -1; |
| | | /** The group id of the RS we are connected to. */ |
| | | private byte rsGroupId = -1; |
| | | /** The server id of the RS we are connected to. */ |
| | |
| | | private Map<Integer, ServerState> replicaStates = |
| | | new HashMap<Integer, ServerState>(); |
| | | /** |
| | | * The expected duration in milliseconds between heartbeats received |
| | | * from the replication server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | /** |
| | | * A thread to monitor heartbeats on the session. |
| | | */ |
| | | private HeartbeatMonitor heartbeatMonitor; |
| | |
| | | * change time of this DS. |
| | | */ |
| | | private CTHeartbeatPublisherThread ctHeartbeatPublisherThread; |
| | | /** |
| | | * The expected period in milliseconds between these messages are sent |
| | | * to the replication server. Zero means heartbeats are off. |
| | | */ |
| | | private long changeTimeHeartbeatSendInterval = 0; |
| | | /* |
| | | * Properties for the last topology info received from the network. |
| | | */ |
| | |
| | | * @param replicationDomain The replication domain that is creating us. |
| | | * @param state The ServerState that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param baseDN The base DN that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param serverId The server ID that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param window The size of the send and receive window to use. |
| | | * @param config The configuration to use. |
| | | * @param generationId The generationId for the server associated to the |
| | | * provided serverId and for the domain associated to the provided baseDN. |
| | | * @param heartbeatInterval The interval (in ms) between heartbeats requested |
| | | * from the replicationServer, or zero if no heartbeats are requested. |
| | | * @param replSessionSecurity The session security configuration. |
| | | * @param groupId The group id of our domain. |
| | | * @param changeTimeHeartbeatInterval The interval (in ms) between Change |
| | | * time heartbeats are sent to the RS, |
| | | * or zero if no CSN heartbeat should be sent. |
| | | */ |
| | | public ReplicationBroker(ReplicationDomain replicationDomain, |
| | | ServerState state, DN baseDN, int serverId, int window, |
| | | long generationId, long heartbeatInterval, |
| | | ReplSessionSecurity replSessionSecurity, byte groupId, |
| | | long changeTimeHeartbeatInterval) |
| | | ServerState state, ReplicationDomainCfg config, long generationId, |
| | | ReplSessionSecurity replSessionSecurity) |
| | | { |
| | | this.domain = replicationDomain; |
| | | this.baseDN = baseDN; |
| | | this.serverId = serverId; |
| | | this.state = state; |
| | | this.config = config; |
| | | this.protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | this.replSessionSecurity = replSessionSecurity; |
| | | this.groupId = groupId; |
| | | this.generationID = generationId; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window / 2; |
| | | this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | | this.halfRcvWindow = rcvWindow / 2; |
| | | |
| | | /* |
| | | * Only create a monitor if there is a replication domain (this is not the |
| | |
| | | synchronized (startStopLock) |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | connect(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Start the ReplicationBroker. |
| | | * |
| | | * @param replicationServers list of servers used |
| | | */ |
| | | public void start(Set<String> replicationServers) |
| | | { |
| | | synchronized (startStopLock) |
| | | { |
| | | // Open Socket to the ReplicationServer Send the Start message |
| | | shutdown = false; |
| | | this.replicationServerUrls = replicationServers; |
| | | |
| | | if (this.replicationServerUrls.size() < 1) |
| | | { |
| | | Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get(); |
| | | logError(message); |
| | | } |
| | | |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | | connect(); |
| | | } |
| | | } |
| | |
| | | */ |
| | | public int getServerId() |
| | | { |
| | | return serverId; |
| | | return config.getServerId(); |
| | | } |
| | | |
| | | private DN getBaseDN() |
| | | { |
| | | return config.getBaseDN(); |
| | | } |
| | | |
| | | private Set<String> getReplicationServerUrls() |
| | | { |
| | | return config.getReplicationServer(); |
| | | } |
| | | |
| | | private byte getGroupId() |
| | | { |
| | | return (byte) config.getGroupId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | replicationServerInfo.setLocallyConfigured(false); |
| | | return; |
| | | } |
| | | for (String serverUrl : replicationServerUrls) |
| | | for (String serverUrl : getReplicationServerUrls()) |
| | | { |
| | | if (isSameReplicationServerUrl(serverUrl, rsUrl)) |
| | | { |
| | |
| | | |
| | | private void connect() |
| | | { |
| | | if (this.baseDN.toNormalizedString().equalsIgnoreCase( |
| | | if (getBaseDN().toNormalizedString().equalsIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) |
| | | { |
| | | connectAsECL(); |
| | |
| | | Map<Integer, ReplicationServerInfo> rsInfos = |
| | | new ConcurrentHashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (String serverUrl : replicationServerUrls) |
| | | for (String serverUrl : getReplicationServerUrls()) |
| | | { |
| | | // Connect to server and get info about it |
| | | ReplicationServerInfo replicationServerInfo = |
| | |
| | | private void connectAsECL() |
| | | { |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | String bestServer = this.replicationServerUrls.iterator().next(); |
| | | String bestServer = getReplicationServerUrls().iterator().next(); |
| | | |
| | | if (performPhaseOneHandshake(bestServer, true, true) != null) |
| | | { |
| | |
| | | |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | final int serverId = getServerId(); |
| | | final DN baseDN = getBaseDN(); |
| | | |
| | | /* |
| | | * Connect to each replication server and get their ServerState then find |
| | | * out which one is the best to connect to. |
| | |
| | | { |
| | | // At least one server answered, find the best one. |
| | | electedRsInfo = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, groupId, getGenerationID()); |
| | | replicationServerInfos, serverId, getGroupId(), getGenerationID()); |
| | | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | |
| | | if (replicationServerInfos.size() > 0) |
| | | { |
| | | Message message = WARN_COULD_NOT_FIND_CHANGELOG.get( |
| | | serverId, |
| | | baseDN.toNormalizedString(), |
| | | serverId, baseDN.toNormalizedString(), |
| | | collectionToString(replicationServerInfos.keySet(), ", ")); |
| | | logError(message); |
| | | } |
| | |
| | | private void connectToReplicationServer(ReplicationServerInfo rsInfo, |
| | | ServerStatus initStatus, TopologyMsg topologyMsg) |
| | | { |
| | | final int serverId = getServerId(); |
| | | final DN baseDN = getBaseDN(); |
| | | try |
| | | { |
| | | replicationServer = session.getReadableRemoteAddress(); |
| | |
| | | } |
| | | } |
| | | sendWindow = new Semaphore(maxSendWindow); |
| | | rcvWindow = maxRcvWindow; |
| | | rcvWindow = getMaxRcvWindow(); |
| | | connected = true; |
| | | |
| | | /* |
| | |
| | | .getGenerationId(), session); |
| | | } |
| | | |
| | | final byte groupId = getGroupId(); |
| | | if (getRsGroupId() != groupId) |
| | | { |
| | | /* |
| | |
| | | int nChanges = ServerState.diffChanges(rsState, state); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("RB for dn " + baseDN + " and with server id " |
| | | + serverId + " computed " + nChanges + " changes late."); |
| | | TRACER.debugInfo("RB for dn " + getBaseDN() + " and with server id " |
| | | + getServerId() + " computed " + nChanges + " changes late."); |
| | | } |
| | | |
| | | /* |
| | |
| | | StartMsg serverStartMsg; |
| | | if (!isECL) |
| | | { |
| | | serverStartMsg = new ServerStartMsg(serverId, url, |
| | | baseDN, maxRcvWindow, heartbeatInterval, state, |
| | | getGenerationID(), isSslEncryption, groupId); |
| | | serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | else |
| | | { |
| | | serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | getGenerationID(), isSslEncryption, groupId); |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | localSession.publish(serverStartMsg); |
| | | |
| | |
| | | ReplicationMsg msg = localSession.receive(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n" |
| | | + serverStartMsg + "\nAND RECEIVED:\n" + msg); |
| | | } |
| | | |
| | |
| | | |
| | | // Sanity check |
| | | DN repDN = replServerInfo.getBaseDN(); |
| | | if (!baseDN.equals(repDN)) |
| | | if (!getBaseDN().equals(repDN)) |
| | | { |
| | | errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get( |
| | | repDN.toNormalizedString(), baseDN.toNormalizedString()); |
| | | repDN.toNormalizedString(), getBaseDN().toNormalizedString()); |
| | | return null; |
| | | } |
| | | |
| | |
| | | } |
| | | catch (ConnectException e) |
| | | { |
| | | errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId, |
| | | server, baseDN.toNormalizedString()); |
| | | errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString()); |
| | | return null; |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(serverId, |
| | | server, baseDN.toNormalizedString()); |
| | | errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString()); |
| | | return null; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e)); |
| | | errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | return null; |
| | | } |
| | | finally |
| | |
| | | // FIXME ECL In the handshake phase two, should RS send back a topo msg ? |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n" |
| | | + startECLSessionMsg); |
| | | } |
| | | |
| | |
| | | connected = true; |
| | | } catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e)); |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | setSession(null); |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n" |
| | | + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg); |
| | | } |
| | | |
| | |
| | | return topologyMsg; |
| | | } catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e)); |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | setSession(null); |
| | |
| | | Map<Integer, ReplicationServerInfo> rsInfos, int localServerId, |
| | | byte groupId, long generationId) |
| | | { |
| | | |
| | | // Shortcut, if only one server, this is the best |
| | | if (rsInfos.size() == 1) |
| | | { |
| | |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.isLocallyConfigured()) |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.isLocallyConfigured()) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | result.put(entry.getKey(), rsInfo); |
| | | } |
| | | } |
| | | return result; |
| | |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.getGroupId() == groupId) |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.getGroupId() == groupId) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | result.put(entry.getKey(), rsInfo); |
| | | } |
| | | } |
| | | return result; |
| | |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | boolean emptyState = true; |
| | | |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.getGenerationId() == generationId) |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.getGenerationId() == generationId) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | if (!replicationServerInfo.serverState.isEmpty()) |
| | | result.put(entry.getKey(), rsInfo); |
| | | if (!rsInfo.serverState.isEmpty()) |
| | | emptyState = false; |
| | | } |
| | | } |
| | |
| | | { |
| | | // If the RS with a generationId have all an empty state, |
| | | // then the 'empty'(genId=-1) RSes are also candidate |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.getGenerationId() == -1) |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.getGenerationId() == -1) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | result.put(entry.getKey(), rsInfo); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Find replication servers who are up to date (or more up to date than us, |
| | | * Find replication servers that are up to date (or more up to date than us, |
| | | * if for instance we failed and restarted, having sent some changes to the |
| | | * RS but without having time to store our own state) regarding our own |
| | | * server id. If some servers more up to date, prefer this list but take |
| | | * server id. If some servers are more up to date, prefer this list but take |
| | | * only the latest CSN. |
| | | */ |
| | | CSN latestRsCSN = null; |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | ServerState rsState = replicationServerInfo.getServerState(); |
| | | CSN rsCSN = rsState.getCSN(localServerId); |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | CSN rsCSN = rsInfo.getServerState().getCSN(localServerId); |
| | | if (rsCSN == null) |
| | | { |
| | | rsCSN = new CSN(0, 0, localServerId); |
| | |
| | | { |
| | | // This replication server has exactly the latest change from the |
| | | // local server |
| | | upToDateServers.put(rsId, replicationServerInfo); |
| | | upToDateServers.put(rsId, rsInfo); |
| | | } else |
| | | { |
| | | // This replication server is even more up to date than the local |
| | |
| | | { |
| | | if (rsCSN.equals(latestRsCSN)) |
| | | { |
| | | moreUpToDateServers.put(rsId, replicationServerInfo); |
| | | moreUpToDateServers.put(rsId, rsInfo); |
| | | } else |
| | | { |
| | | // This RS is even more up to date, clear the list and store this |
| | | // new RS |
| | | moreUpToDateServers.clear(); |
| | | moreUpToDateServers.put(rsId, replicationServerInfo); |
| | | moreUpToDateServers.put(rsId, rsInfo); |
| | | latestRsCSN = rsCSN; |
| | | } |
| | | } |
| | |
| | | * Initially look for all servers on the same host. If we find one in the |
| | | * same VM, then narrow the search. |
| | | */ |
| | | boolean filterServersInSameVM = false; |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | boolean foundRSInSameVM = false; |
| | | final Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | final HostPort hp = |
| | | HostPort.valueOf(replicationServerInfo.getServerURL()); |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | final HostPort hp = HostPort.valueOf(rsInfo.getServerURL()); |
| | | if (hp.isLocalAddress()) |
| | | { |
| | | if (isLocalReplicationServerPort(hp.getPort())) |
| | | { |
| | | // An RS in the same VM will always have priority. |
| | | if (!filterServersInSameVM) |
| | | if (!foundRSInSameVM) |
| | | { |
| | | // An RS in the same VM will always have priority. |
| | | // Narrow the search to only include servers in this VM. |
| | | result.clear(); |
| | | filterServersInSameVM = true; |
| | | foundRSInSameVM = true; |
| | | } |
| | | result.put(rsId, replicationServerInfo); |
| | | result.put(rsId, rsInfo); |
| | | } |
| | | else if (!filterServersInSameVM) |
| | | else if (!foundRSInSameVM) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | // OK, accept RSs on the same machine because we have not found an RS |
| | | // in the same VM yet |
| | | result.put(rsId, rsInfo); |
| | | } |
| | | else |
| | | { |
| | |
| | | Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>(); |
| | | // Precision for the operations (number of digits after the dot) |
| | | final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | |
| | | int rsWeight = replicationServerInfo.getWeight(); |
| | | // load goal = rs weight / sum of weights |
| | | BigDecimal loadGoalBd = BigDecimal.valueOf(rsWeight).divide( |
| | | BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide( |
| | | BigDecimal.valueOf(sumOfWeights), mathContext); |
| | | BigDecimal currentLoadBd = BigDecimal.ZERO; |
| | | if (sumOfConnectedDSs != 0) |
| | | { |
| | | // current load = number of connected DSs / total number of DSs |
| | | int connectedDSs = replicationServerInfo.getConnectedDSNumber(); |
| | | int connectedDSs = rsInfo.getConnectedDSNumber(); |
| | | currentLoadBd = BigDecimal.valueOf(connectedDSs).divide( |
| | | BigDecimal.valueOf(sumOfConnectedDSs), mathContext); |
| | | } |
| | |
| | | mathContext); |
| | | |
| | | /* |
| | | Now compare both values: we must no disconnect the DS if this |
| | | Now compare both values: we must not disconnect the DS if this |
| | | is for going in a situation where the load distance of the other |
| | | RSs is the opposite of the future load distance of the local RS |
| | | or we would evaluate that we should disconnect just after being |
| | |
| | | private void startRSHeartBeatMonitoring() |
| | | { |
| | | // Start a heartbeat monitor thread. |
| | | final long heartbeatInterval = config.getHeartbeatInterval(); |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(), |
| | | baseDN.toNormalizedString(), session, heartbeatInterval); |
| | | getBaseDN().toNormalizedString(), session, heartbeatInterval); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( |
| | | baseDN.toNormalizedString(), e.getLocalizedMessage())); |
| | | getBaseDN().toNormalizedString(), e.getLocalizedMessage())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | |
| | | break; |
| | | } |
| | | |
| | | final int serverId = getServerId(); |
| | | final DN baseDN = getBaseDN(); |
| | | final int previousRsServerID = rsServerId; |
| | | try |
| | | { |
| | |
| | | // best server checking. |
| | | final ReplicationServerInfo bestServerInfo = |
| | | computeBestReplicationServer(false, previousRsServerID, state, |
| | | replicationServerInfos, serverId, groupId, generationID); |
| | | replicationServerInfos, serverId, getGroupId(), |
| | | generationID); |
| | | if (previousRsServerID != -1 |
| | | && (bestServerInfo == null |
| | | || bestServerInfo.getServerId() != previousRsServerID)) |
| | |
| | | monitorResponse.set(false); |
| | | |
| | | // publish Monitor Request Message to the Replication Server |
| | | publish(new MonitorRequestMsg(serverId, getRsServerId())); |
| | | publish(new MonitorRequestMsg(getServerId(), getRsServerId())); |
| | | |
| | | // wait for Response up to 10 seconds. |
| | | try |
| | |
| | | public void stop() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("ReplicationBroker " + serverId + " is stopping and will" |
| | | + " close the connection to replication server " + rsServerId + " for" |
| | | + " domain " + baseDN); |
| | | TRACER.debugInfo("ReplicationBroker " + getServerId() + " is stopping" |
| | | + " and will close the connection to replication server " |
| | | + rsServerId + " for domain " + getBaseDN()); |
| | | |
| | | synchronized (startStopLock) |
| | | { |
| | |
| | | */ |
| | | public int getMaxRcvWindow() |
| | | { |
| | | return maxRcvWindow; |
| | | return config.getWindowSize(); |
| | | } |
| | | |
| | | /** |
| | |
| | | /** |
| | | * Change some configuration parameters. |
| | | * |
| | | * @param replicationServers The new list of replication servers. |
| | | * @param window The max window size. |
| | | * @param heartbeatInterval The heartBeat interval. |
| | | * |
| | | * @param newConfig The new config to use. |
| | | * @return A boolean indicating if the changes |
| | | * requires to restart the service. |
| | | * @param groupId The new group id to use |
| | | */ |
| | | public boolean changeConfig(Set<String> replicationServers, int window, |
| | | long heartbeatInterval, byte groupId) |
| | | public boolean changeConfig(ReplicationDomainCfg newConfig) |
| | | { |
| | | // These parameters needs to be renegotiated with the ReplicationServer |
| | | // so if they have changed, that requires restarting the session with |
| | |
| | | // A new session is necessary only when information regarding |
| | | // the connection is modified |
| | | boolean needToRestartSession = |
| | | this.replicationServerUrls == null |
| | | || !replicationServers.equals(this.replicationServerUrls) |
| | | || window != this.maxRcvWindow |
| | | || heartbeatInterval != this.heartbeatInterval |
| | | || groupId != this.groupId; |
| | | !newConfig.getReplicationServer().equals(config.getReplicationServer()) |
| | | || newConfig.getWindowSize() != config.getWindowSize() |
| | | || newConfig.getHeartbeatInterval() != config.getHeartbeatInterval() |
| | | || newConfig.getGroupId() != config.getGroupId(); |
| | | |
| | | this.replicationServerUrls = replicationServers; |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window / 2; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.groupId = groupId; |
| | | this.config = newConfig; |
| | | this.rcvWindow = newConfig.getWindowSize(); |
| | | this.halfRcvWindow = this.rcvWindow / 2; |
| | | |
| | | return needToRestartSession; |
| | | } |
| | |
| | | } catch (IOException ex) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_CS.get( |
| | | baseDN.toNormalizedString(), |
| | | Integer.toString(serverId), |
| | | getBaseDN().toNormalizedString(), |
| | | Integer.toString(getServerId()), |
| | | ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex)); |
| | | logError(message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Sets the group id of the broker. |
| | | * @param groupId The new group id. |
| | | */ |
| | | public void setGroupId(byte groupId) |
| | | { |
| | | this.groupId = groupId; |
| | | } |
| | | |
| | | /** |
| | | * Gets the info for DSs in the topology (except us). |
| | | * @return The info for DSs in the topology (except us) |
| | | */ |
| | |
| | | sent by the replication server in the topology message. We must count |
| | | ourselves as a connected server. |
| | | */ |
| | | connectedDSs.add(serverId); |
| | | connectedDSs.add(getServerId()); |
| | | } |
| | | |
| | | for (DSInfo dsInfo : dsList) |
| | |
| | | /** |
| | | * Starts publishing to the RS the current timestamp used in this server. |
| | | */ |
| | | public void startChangeTimeHeartBeatPublishing() |
| | | private void startChangeTimeHeartBeatPublishing() |
| | | { |
| | | // Start a CSN heartbeat thread. |
| | | if (changeTimeHeartbeatSendInterval > 0) |
| | | long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval(); |
| | | if (changeTimeHeartbeatInterval > 0) |
| | | { |
| | | final Session localSession = session; |
| | | final String threadName = "Replica DS(" + getServerId() |
| | | + ") change time heartbeat publisher for domain \"" |
| | | + baseDN + "\" to RS(" + getRsServerId() |
| | | + getBaseDN() + "\" to RS(" + getRsServerId() |
| | | + ") at " + localSession.getReadableRemoteAddress(); |
| | | |
| | | ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( |
| | | threadName, localSession, changeTimeHeartbeatSendInterval, serverId); |
| | | threadName, localSession, changeTimeHeartbeatInterval, getServerId()); |
| | | ctHeartbeatPublisherThread.start(); |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this |
| | |
| | | /** |
| | | * Stops publishing to the RS the current timestamp used in this server. |
| | | */ |
| | | public synchronized void stopChangeTimeHeartBeatPublishing() |
| | | private synchronized void stopChangeTimeHeartBeatPublishing() |
| | | { |
| | | if (ctHeartbeatPublisherThread != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set a new change time heartbeat interval to this broker. |
| | | * @param changeTimeHeartbeatInterval The new interval (in ms). |
| | | */ |
| | | public void setChangeTimeHeartbeatInterval(int changeTimeHeartbeatInterval) |
| | | { |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval; |
| | | startChangeTimeHeartBeatPublishing(); |
| | | } |
| | | |
| | | /** |
| | | * Set the connectRequiresRecovery to the provided value. |
| | | * This flag is used to indicate if a recovery of Update is necessary |
| | | * after a reconnection to a RS. |
| | |
| | | { |
| | | final StringBuilder sb = new StringBuilder(); |
| | | sb.append(getClass().getSimpleName()) |
| | | .append(" \"").append(baseDN).append(" ").append(serverId).append("\",") |
| | | .append(" groupId=").append(groupId) |
| | | .append(" \"").append(getBaseDN()).append(" ") |
| | | .append(getServerId()).append("\",") |
| | | .append(" groupId=").append(getGroupId()) |
| | | .append(", genId=").append(generationID) |
| | | .append(", connected=").append(connected).append(", "); |
| | | if (rsServerId == -1) |