/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
* Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.service;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
/**
* The broker for Multi-master Replication.
*/
public class ReplicationBroker
{
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
private volatile boolean shutdown = false;
private final Object startStopLock = new Object();
/**
* Replication server URLs under this format: "hostname:port".
*/
private volatile Collection replicationServerUrls;
private volatile boolean connected = false;
private volatile String replicationServer = "Not connected";
private volatile ProtocolSession session = null;
private final ServerState state;
private final String baseDn;
private final int serverId;
private Semaphore sendWindow;
private int maxSendWindow;
private int rcvWindow = 100;
private int halfRcvWindow = rcvWindow / 2;
private int maxRcvWindow = rcvWindow;
private int timeout = 0;
private short protocolVersion;
private ReplSessionSecurity replSessionSecurity;
/** My group id. */
private byte groupId = -1;
/** The group id of the RS we are connected to. */
private byte rsGroupId = -1;
/** The server id of the RS we are connected to. */
private Integer rsServerId = -1;
/** The server URL of the RS we are connected to. */
private String rsServerUrl = null;
/** Our replication domain. */
private ReplicationDomain domain = null;
/**
* This object is used as a conditional event to be notified about
* the reception of monitor information from the Replication Server.
*/
private final MutableBoolean monitorResponse = new MutableBoolean(false);
/**
* A Map containing the ServerStates of all the replicas in the topology
* as seen by the ReplicationServer the last time it was polled or the last
* time it published monitoring information.
*/
private HashMap replicaStates =
new HashMap();
/**
* The expected duration in milliseconds between heartbeats received
* from the replication server. Zero means heartbeats are off.
*/
private long heartbeatInterval = 0;
/**
* A thread to monitor heartbeats on the session.
*/
private HeartbeatMonitor heartbeatMonitor = null;
/**
* The number of times the connection was lost.
*/
private int numLostConnections = 0;
/**
* When the broker cannot connect to any replication server
* it log an error and keeps continuing every second.
* This boolean is set when the first failure happens and is used
* to avoid repeating the error message for further failure to connect
* and to know that it is necessary to print a new message when the broker
* finally succeed to connect.
*/
private volatile boolean connectionError = false;
private final Object connectPhaseLock = new Object();
/**
* The thread that publishes messages to the RS containing the current
* change time of this DS.
*/
private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
/**
* The expected period in milliseconds between these messages are sent
* to the replication server. Zero means heartbeats are off.
*/
private long changeTimeHeartbeatSendInterval = 0;
/*
* Properties for the last topology info received from the network.
*/
// Info for other DSs.
// Warning: does not contain info for us (for our server id)
private volatile List dsList = new ArrayList();
private volatile long generationID;
private volatile int updateDoneCount = 0;
private volatile boolean connectRequiresRecovery = false;
/**
* The map of replication server info initialized at connection time and
* regularly updated. This is used to decide to which best suitable
* replication server one wants to connect. Key: replication server id Value:
* replication server info for the matching replication server id
*/
private volatile Map replicationServerInfos
= null;
/**
* This integer defines when the best replication server checking algorithm
* should be engaged.
* Every time a monitoring message (each monitoring publisher period) is
* received, it is incremented. When it reaches 2, we run the checking
* algorithm to see if we must reconnect to another best replication server.
* Then we reset the value to 0. But when a topology message is received, the
* integer is reseted to 0. This ensures that we wait at least one monitoring
* publisher period before running the algorithm, but also that we wait at
* least for a monitoring period after the last received topology message
* (topology stabilization).
*/
private int mustRunBestServerCheckingAlgorithm = 0;
/**
* Creates a new ReplicationServer Broker for a particular ReplicationDomain.
*
* @param replicationDomain The replication domain that is creating us.
* @param state The ServerState that should be used by this broker
* when negotiating the session with the replicationServer.
* @param baseDn The base DN that should be used by this broker
* when negotiating the session with the replicationServer.
* @param serverID2 The server ID that should be used by this broker
* when negotiating the session with the replicationServer.
* @param window The size of the send and receive window to use.
* @param generationId The generationId for the server associated to the
* provided serverId and for the domain associated to the provided baseDN.
* @param heartbeatInterval The interval (in ms) between heartbeats requested
* from the replicationServer, or zero if no heartbeats are requested.
* @param replSessionSecurity The session security configuration.
* @param groupId The group id of our domain.
* @param changeTimeHeartbeatInterval The interval (in ms) between Change
* time heartbeats are sent to the RS,
* or zero if no CN heartbeat should be sent.
*/
public ReplicationBroker(ReplicationDomain replicationDomain,
ServerState state, String baseDn, int serverID2, int window,
long generationId, long heartbeatInterval,
ReplSessionSecurity replSessionSecurity, byte groupId,
long changeTimeHeartbeatInterval)
{
this.domain = replicationDomain;
this.baseDn = baseDn;
this.serverId = serverID2;
this.state = state;
this.protocolVersion = ProtocolVersion.getCurrentVersion();
this.replSessionSecurity = replSessionSecurity;
this.groupId = groupId;
this.generationID = generationId;
this.heartbeatInterval = heartbeatInterval;
this.rcvWindow = window;
this.maxRcvWindow = window;
this.halfRcvWindow = window / 2;
this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
}
/**
* Start the ReplicationBroker.
*/
public void start()
{
synchronized (startStopLock)
{
shutdown = false;
this.rcvWindow = this.maxRcvWindow;
this.connect();
}
}
/**
* Start the ReplicationBroker.
*
* @param replicationServers list of servers used
*/
public void start(Collection replicationServers)
{
synchronized (startStopLock)
{
// Open Socket to the ReplicationServer Send the Start message
shutdown = false;
this.replicationServerUrls = replicationServers;
if (this.replicationServerUrls.size() < 1)
{
Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
logError(message);
}
this.rcvWindow = this.maxRcvWindow;
this.connect();
}
}
/**
* Gets the group id of the RS we are connected to.
* @return The group id of the RS we are connected to
*/
public byte getRsGroupId()
{
return rsGroupId;
}
/**
* Gets the server id of the RS we are connected to.
* @return The server id of the RS we are connected to
*/
public Integer getRsServerId()
{
return rsServerId;
}
/**
* Gets the server id.
* @return The server id
*/
public int getServerId()
{
return serverId;
}
/**
* Gets the server id.
* @return The server id
*/
private long getGenerationID()
{
if (domain != null)
{
// Update the generation id
generationID = domain.getGenerationID();
}
return generationID;
}
/**
* Set the generation id - for test purpose.
* @param generationID The generation id
*/
public void setGenerationID(long generationID)
{
this.generationID = generationID;
}
/**
* Gets the server url of the RS we are connected to.
* @return The server url of the RS we are connected to
*/
public String getRsServerUrl()
{
return rsServerUrl;
}
/**
* Sets the locally configured flag for the passed ReplicationServerInfo
* object, analyzing the local configuration.
* @param
*/
private void updateRSInfoLocallyConfiguredStatus(
ReplicationServerInfo replicationServerInfo)
{
// Determine if the passed ReplicationServerInfo has a URL that is present
// in the locally configured replication servers
String rsUrl = replicationServerInfo.getServerURL();
if (rsUrl == null)
{
// The ReplicationServerInfo has been generated from a server with
// no URL in TopologyMsg (i.e: with replication protocol version < 4):
// ignore this server as we do not know how to connect to it
replicationServerInfo.setLocallyConfigured(false);
return;
}
for (String serverUrl : replicationServerUrls)
{
if (isSameReplicationServerUrl(serverUrl, rsUrl))
{
// This RS is locally configured, mark this
replicationServerInfo.setLocallyConfigured(true);
replicationServerInfo.serverURL = serverUrl;
return;
}
}
replicationServerInfo.setLocallyConfigured(false);
}
/**
* Compares 2 replication servers addresses and returns true if they both
* represent the same replication server instance.
* @param rs1Url Replication server 1 address
* @param rs2Url Replication server 2 address
* @return True if both replication server addresses represent the same
* replication server instance, false otherwise.
*/
private static boolean isSameReplicationServerUrl(String rs1Url,
String rs2Url)
{
// Get and compare ports of RS1 and RS2
int separator1 = rs1Url.lastIndexOf(':');
if (separator1 < 0)
{
// Not a RS url: should not happen
return false;
}
int rs1Port = Integer.parseInt(rs1Url.substring(separator1 + 1));
int separator2 = rs2Url.lastIndexOf(':');
if (separator2 < 0)
{
// Not a RS url: should not happen
return false;
}
int rs2Port = Integer.parseInt(rs2Url.substring(separator2 + 1));
if (rs1Port != rs2Port)
{
return false;
}
// Get and compare addresses of RS1 and RS2
String rs1 = rs1Url.substring(0, separator1);
InetAddress[] rs1Addresses;
try
{
if (isLocalAddress(rs1))
{
// Replace localhost with the local official hostname
rs1 = InetAddress.getLocalHost().getHostName();
}
rs1Addresses = InetAddress.getAllByName(rs1);
} catch (UnknownHostException ex)
{
// Unknown RS: should not happen
return false;
}
String rs2 = rs2Url.substring(0, separator2);
InetAddress[] rs2Addresses;
try
{
if (isLocalAddress(rs1))
{
// Replace localhost with the local official hostname
rs2 = InetAddress.getLocalHost().getHostName();
}
rs2Addresses = InetAddress.getAllByName(rs2);
} catch (UnknownHostException ex)
{
// Unknown RS: should not happen
return false;
}
// Now compare addresses, if at least one match, this is the same server
for (InetAddress inetAddress1 : rs1Addresses)
{
for (InetAddress inetAddress2 : rs2Addresses)
{
if (inetAddress2.equals(inetAddress1))
{
return true;
}
}
}
return false;
}
/**
* Bag class for keeping info we get from a replication server in order to
* compute the best one to connect to. This is in fact a wrapper to a
* ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be
* updated with a info coming from received topology messages or monitoring
* messages.
*/
public static class ReplicationServerInfo
{
private short protocolVersion;
private long generationId;
private byte groupId = -1;
private int serverId;
// Received server URL
private String serverURL;
private String baseDn = null;
private int windowSize;
private ServerState serverState = null;
private boolean sslEncryption;
private int degradedStatusThreshold = -1;
// Keeps the 1 value if created with a ReplServerStartMsg
private int weight = 1;
// Keeps the 0 value if created with a ReplServerStartMsg
private int connectedDSNumber = 0;
private List connectedDSs = null;
// Is this RS locally configured ? (the RS is recognized as a usable server)
private boolean locallyConfigured = true;
/**
* Create a new instance of ReplicationServerInfo wrapping the passed
* message.
* @param msg Message to wrap.
* @param server Override serverURL.
* @return The new instance wrapping the passed message.
* @throws IllegalArgumentException If the passed message has an unexpected
* type.
*/
public static ReplicationServerInfo newInstance(
ReplicationMsg msg, String server) throws IllegalArgumentException
{
ReplicationServerInfo rsInfo = newInstance(msg);
rsInfo.serverURL = server;
return rsInfo;
}
/**
* Create a new instance of ReplicationServerInfo wrapping the passed
* message.
* @param msg Message to wrap.
* @return The new instance wrapping the passed message.
* @throws IllegalArgumentException If the passed message has an unexpected
* type.
*/
public static ReplicationServerInfo newInstance(
ReplicationMsg msg) throws IllegalArgumentException
{
if (msg instanceof ReplServerStartMsg)
{
// This is a ReplServerStartMsg (RS uses protocol V3 or under)
ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg;
return new ReplicationServerInfo(replServerStartMsg);
} else if (msg instanceof ReplServerStartDSMsg)
{
// This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg;
return new ReplicationServerInfo(replServerStartDSMsg);
}
// Unsupported message type: should not happen
throw new IllegalArgumentException("Unexpected PDU type: " +
msg.getClass().getName() + " :\n" + msg.toString());
}
/**
* Constructs a ReplicationServerInfo object wrapping a
* {@link ReplServerStartMsg}.
*
* @param replServerStartMsg
* The {@link ReplServerStartMsg} this object will wrap.
*/
private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
{
this.protocolVersion = replServerStartMsg.getVersion();
this.generationId = replServerStartMsg.getGenerationId();
this.groupId = replServerStartMsg.getGroupId();
this.serverId = replServerStartMsg.getServerId();
this.serverURL = replServerStartMsg.getServerURL();
this.baseDn = replServerStartMsg.getBaseDn();
this.windowSize = replServerStartMsg.getWindowSize();
this.serverState = replServerStartMsg.getServerState();
this.sslEncryption = replServerStartMsg.getSSLEncryption();
this.degradedStatusThreshold =
replServerStartMsg.getDegradedStatusThreshold();
}
/**
* Constructs a ReplicationServerInfo object wrapping a
* {@link ReplServerStartDSMsg}.
*
* @param replServerStartDSMsg
* The {@link ReplServerStartDSMsg} this object will wrap.
*/
private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
{
this.protocolVersion = replServerStartDSMsg.getVersion();
this.generationId = replServerStartDSMsg.getGenerationId();
this.groupId = replServerStartDSMsg.getGroupId();
this.serverId = replServerStartDSMsg.getServerId();
this.serverURL = replServerStartDSMsg.getServerURL();
this.baseDn = replServerStartDSMsg.getBaseDn();
this.windowSize = replServerStartDSMsg.getWindowSize();
this.serverState = replServerStartDSMsg.getServerState();
this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
this.degradedStatusThreshold =
replServerStartDSMsg.getDegradedStatusThreshold();
this.weight = replServerStartDSMsg.getWeight();
this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
}
/**
* Get the server state.
* @return The server state
*/
public ServerState getServerState()
{
return serverState;
}
/**
* get the group id.
* @return The group id
*/
public byte getGroupId()
{
return groupId;
}
/**
* Get the server protocol version.
* @return the protocolVersion
*/
public short getProtocolVersion()
{
return protocolVersion;
}
/**
* Get the generation id.
* @return the generationId
*/
public long getGenerationId()
{
return generationId;
}
/**
* Get the server id.
* @return the serverId
*/
public int getServerId()
{
return serverId;
}
/**
* Get the server URL.
* @return the serverURL
*/
public String getServerURL()
{
return serverURL;
}
/**
* Get the base dn.
* @return the baseDn
*/
public String getBaseDn()
{
return baseDn;
}
/**
* Get the window size.
* @return the windowSize
*/
public int getWindowSize()
{
return windowSize;
}
/**
* Get the ssl encryption.
* @return the sslEncryption
*/
public boolean isSslEncryption()
{
return sslEncryption;
}
/**
* Get the degraded status threshold.
* @return the degradedStatusThreshold
*/
public int getDegradedStatusThreshold()
{
return degradedStatusThreshold;
}
/**
* Get the weight.
* @return the weight. Null if this object is a wrapper for
* a ReplServerStartMsg.
*/
public int getWeight()
{
return weight;
}
/**
* Get the connected DS number.
* @return the connectedDSNumber. Null if this object is a wrapper for
* a ReplServerStartMsg.
*/
public int getConnectedDSNumber()
{
return connectedDSNumber;
}
/**
* Constructs a new replication server info with the passed RSInfo
* internal values and the passed connected DSs.
* @param rsInfo The RSinfo to use for the update
* @param connectedDSs The new connected DSs
*/
public ReplicationServerInfo(RSInfo rsInfo, List connectedDSs)
{
this.serverId = rsInfo.getId();
this.serverURL = rsInfo.getServerUrl();
this.generationId = rsInfo.getGenerationId();
this.groupId = rsInfo.getGroupId();
this.weight = rsInfo.getWeight();
this.connectedDSs = connectedDSs;
this.connectedDSNumber = connectedDSs.size();
this.serverState = new ServerState();
}
/**
* Converts the object to a RSInfo object.
* @return The RSInfo object matching this object.
*/
public RSInfo toRSInfo()
{
return new RSInfo(serverId, serverURL, generationId, groupId, weight);
}
/**
* Updates replication server info with the passed RSInfo internal values
* and the passed connected DSs.
* @param rsInfo The RSinfo to use for the update
* @param connectedDSs The new connected DSs
*/
public void update(RSInfo rsInfo, List connectedDSs)
{
this.generationId = rsInfo.getGenerationId();
this.groupId = rsInfo.getGroupId();
this.weight = rsInfo.getWeight();
this.connectedDSs = connectedDSs;
this.connectedDSNumber = connectedDSs.size();
}
/**
* Updates replication server info with the passed server state.
* @param serverState The ServerState to use for the update
*/
public void update(ServerState serverState)
{
if (this.serverState != null)
{
this.serverState.update(serverState);
} else
{
this.serverState = serverState;
}
}
/**
* Get the getConnectedDSs.
* @return the getConnectedDSs
*/
public List getConnectedDSs()
{
return connectedDSs;
}
/**
* Gets the locally configured status for this RS.
* @return the locallyConfigured
*/
public boolean isLocallyConfigured()
{
return locallyConfigured;
}
/**
* Sets the locally configured status for this RS.
* @param locallyConfigured the locallyConfigured to set
*/
public void setLocallyConfigured(boolean locallyConfigured)
{
this.locallyConfigured = locallyConfigured;
}
/**
* Returns a string representation of this object.
* @return A string representation of this object.
*/
@Override
public String toString()
{
return "Url:" + this.serverURL + " ServerId:" + this.serverId
+ " GroupId:" + this.groupId;
}
}
private void connect()
{
if (this.baseDn.compareToIgnoreCase(
ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0)
{
connectAsECL();
} else
{
connectAsDataServer();
}
}
/**
* Contacts all replication servers to get information from them and being
* able to choose the more suitable.
* @return the collected information.
*/
private Map collectReplicationServersInfo()
{
Map rsInfos =
new ConcurrentHashMap();
for (String serverUrl : replicationServerUrls)
{
// Connect to server and get info about it
ReplicationServerInfo replicationServerInfo =
performPhaseOneHandshake(serverUrl, false, false);
// Store server info in list
if (replicationServerInfo != null)
{
rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo);
}
}
return rsInfos;
}
/**
* Special aspects of connecting as ECL (External Change Log) compared to
* connecting as data server are :
*
*/
private void connectAsECL()
{
// FIXME:ECL List of RS to connect is for now limited to one RS only
String bestServer = this.replicationServerUrls.iterator().next();
if (performPhaseOneHandshake(bestServer, true, true) != null)
{
performECLPhaseTwoHandshake(bestServer);
}
}
/**
* Connect to a ReplicationServer.
*
* Handshake sequences between a DS and a RS is divided into 2 logical
* consecutive phases (phase 1 and phase 2). DS always initiates connection
* and always sends first message:
*
* DS<->RS:
* -------
*
* phase 1:
* DS --- ServerStartMsg ---> RS
* DS <--- ReplServerStartDSMsg --- RS
* phase 2:
* DS --- StartSessionMsg ---> RS
* DS <--- TopologyMsg --- RS
*
* Before performing a full handshake sequence, DS searches for best suitable
* RS by making only phase 1 handshake to every RS he knows then closing
* connection. This allows to gather information on available RSs and then
* decide with which RS the full handshake (phase 1 then phase 2) will be
* finally performed.
*
* @throws NumberFormatException address was invalid
*/
private void connectAsDataServer()
{
// May have created a broker with null replication domain for
// unit test purpose.
if (domain != null)
{
// If a first connect or a connection failure occur, we go through here.
// force status machine to NOT_CONNECTED_STATUS so that monitoring can
// see that we are not connected.
domain.toNotConnectedStatus();
}
// Stop any existing poller and heartbeat monitor from a previous session.
stopRSHeartBeatMonitoring();
stopChangeTimeHeartBeatPublishing();
mustRunBestServerCheckingAlgorithm = 0;
synchronized (connectPhaseLock)
{
/*
* Connect to each replication server and get their ServerState then find
* out which one is the best to connect to.
*/
if (debugEnabled())
debugInfo("serverId: " + serverId +
" phase 1 : will perform PhaseOneH with each RS in " +
" order to elect the preferred one");
// Get info from every available replication servers
replicationServerInfos = collectReplicationServersInfo();
ReplicationServerInfo electedRsInfo = null;
if (replicationServerInfos.size() > 0)
{
// At least one server answered, find the best one.
electedRsInfo = computeBestReplicationServer(true, -1, state,
replicationServerInfos, serverId, baseDn, groupId, getGenerationID());
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
debugInfo("serverId: " + serverId +
" phase 2 : will perform PhaseOneH with the preferred RS="
+ electedRsInfo);
electedRsInfo = performPhaseOneHandshake(
electedRsInfo.getServerURL(), true, false);
if (electedRsInfo != null)
{
// Update replication server info with potentially more up to date
// data (server state for instance may have changed)
replicationServerInfos
.put(electedRsInfo.getServerId(), electedRsInfo);
// Handshake phase 1 exchange went well
// Compute in which status we are starting the session to tell the RS
ServerStatus initStatus =
computeInitialServerStatus(electedRsInfo.getGenerationId(),
electedRsInfo.getServerState(),
electedRsInfo.getDegradedStatusThreshold(),
getGenerationID());
// Perform session start (handshake phase 2)
TopologyMsg topologyMsg = performPhaseTwoHandshake(
electedRsInfo.getServerURL(), initStatus);
if (topologyMsg != null) // Handshake phase 2 exchange went well
{
connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
} // Could perform handshake phase 2 with best
} // Could perform handshake phase 1 with best
} // Reached some servers
if (connected)
{
connectPhaseLock.notify();
if ((electedRsInfo.getGenerationId() == getGenerationID())
|| (electedRsInfo.getGenerationId() == -1))
{
Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
.get(serverId, rsServerId, baseDn,
session.getReadableRemoteAddress(),
getGenerationID());
logError(message);
} else
{
Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
.get(serverId, rsServerId, baseDn,
session.getReadableRemoteAddress(),
getGenerationID(),
electedRsInfo.getGenerationId());
logError(message);
}
} else
{
/*
* This server could not find any replicationServer. It's going to start
* in degraded mode. Log a message.
*/
if (!connectionError)
{
connectionError = true;
connectPhaseLock.notify();
if (replicationServerInfos.size() > 0)
{
Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
serverId,
baseDn,
collectionToString(replicationServerInfos.keySet(),
", "));
logError(message);
}
else
{
Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
serverId, baseDn);
logError(message);
}
}
}
}
}
/**
* Connects to a replication server.
*
* @param rsInfo
* the Replication Server to connect to
* @param initStatus
* The status to enter the state machine with
* @param topologyMsg
* the message containing the topology information
*/
private void connectToReplicationServer(ReplicationServerInfo rsInfo,
ServerStatus initStatus, TopologyMsg topologyMsg)
{
try
{
replicationServer = session.getReadableRemoteAddress();
maxSendWindow = rsInfo.getWindowSize();
rsGroupId = rsInfo.getGroupId();
rsServerId = rsInfo.getServerId();
rsServerUrl = rsInfo.getServerURL();
receiveTopo(topologyMsg);
// Log a message to let the administrator know that the failure
// was resolved.
// Wakeup all the thread that were waiting on the window
// on the previous connection.
connectionError = false;
if (sendWindow != null)
{
/*
* Fix (hack) for OPENDJ-401: we want to ensure that no threads holding
* this semaphore will get blocked when they acquire it. However, we
* also need to make sure that we don't overflow the semaphore by
* releasing too many permits.
*/
final int MAX_PERMITS = (Integer.MAX_VALUE >>> 2);
if (sendWindow.availablePermits() < MAX_PERMITS)
{
/*
* At least 2^29 acquisitions would need to occur for this to be
* insufficient. In addition, at least 2^30 releases would need to
* occur for this to potentially overflow. Hopefully this is unlikely
* to happen.
*/
sendWindow.release(MAX_PERMITS);
}
}
sendWindow = new Semaphore(maxSendWindow);
rcvWindow = maxRcvWindow;
connected = true;
// May have created a broker with null replication domain for
// unit test purpose.
if (domain != null)
{
domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo
.getGenerationId(), session);
}
if (getRsGroupId() != groupId)
{
// Connected to replication server with wrong group id:
// warn user and start poller to recover when a server with
// right group id arrives...
Message message =
WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte
.toString(groupId), Integer.toString(rsServerId), rsInfo
.getServerURL(), Byte.toString(getRsGroupId()), baseDn, Integer
.toString(serverId));
logError(message);
}
startRSHeartBeatMonitoring();
if (rsInfo.getProtocolVersion() >=
ProtocolVersion.REPLICATION_PROTOCOL_V3)
{
startChangeTimeHeartBeatPublishing();
}
}
catch (Exception e)
{
Message message =
ERR_COMPUTING_FAKE_OPS.get(baseDn, rsInfo.getServerURL(), e
.getLocalizedMessage()
+ stackTraceToSingleLineString(e));
logError(message);
}
finally
{
if (connected == false)
{
ProtocolSession localSession = session;
if (localSession != null)
{
localSession.close();
session = null;
}
}
}
}
/**
* Determines the status we are starting with according to our state and the
* RS state.
*
* @param rsGenId The generation id of the RS
* @param rsState The server state of the RS
* @param degradedStatusThreshold The degraded status threshold of the RS
* @param dsGenId The local generation id
* @return The initial status
*/
public ServerStatus computeInitialServerStatus(long rsGenId,
ServerState rsState, int degradedStatusThreshold, long dsGenId)
{
if (rsGenId == -1)
{
// RS has no generation id
return ServerStatus.NORMAL_STATUS;
} else
{
if (rsGenId == dsGenId)
{
// DS and RS have same generation id
// Determine if we are late or not to replay changes. RS uses a
// threshold value for pending changes to be replayed by a DS to
// determine if the DS is in normal status or in degraded status.
// Let's compare the local and remote server state using this threshold
// value to determine if we are late or not
ServerStatus initStatus = ServerStatus.INVALID_STATUS;
int nChanges = ServerState.diffChanges(rsState, state);
if (debugEnabled())
{
debugInfo("RB for dn " + baseDn +
" and with server id " + Integer.toString(serverId) + " computed " +
Integer.toString(nChanges) + " changes late.");
}
// Check status to know if it is relevant to change the status. Do not
// take RSD lock to test. If we attempt to change the status whereas
// we are in a status that do not allows that, this will be noticed by
// the changeStatusFromStatusAnalyzer method. This allows to take the
// lock roughly only when needed versus every sleep time timeout.
if (degradedStatusThreshold > 0)
{
if (nChanges >= degradedStatusThreshold)
{
initStatus = ServerStatus.DEGRADED_STATUS;
} else
{
initStatus = ServerStatus.NORMAL_STATUS;
}
} else
{
// 0 threshold value means no degrading system used (no threshold):
// force normal status
initStatus = ServerStatus.NORMAL_STATUS;
}
return initStatus;
} else
{
// DS and RS do not have same generation id
return ServerStatus.BAD_GEN_ID_STATUS;
}
}
}
/**
* Connect to the provided server performing the first phase handshake (start
* messages exchange) and return the reply message from the replication
* server, wrapped in a ReplicationServerInfo object.
*
* @param server
* Server to connect to.
* @param keepConnection
* Do we keep session opened or not after handshake. Use true if want
* to perform handshake phase 2 with the same session and keep the
* session to create as the current one.
* @param isECL
* Indicates whether or not the an ECL handshake is to be performed.
* @return The answer from the server . Null if could not get an answer.
*/
private ReplicationServerInfo performPhaseOneHandshake(
String server, boolean keepConnection, boolean isECL)
{
int separator = server.lastIndexOf(':');
String port = server.substring(separator + 1);
String hostname = server.substring(0, separator);
ProtocolSession localSession = null;
Socket socket = null;
boolean hasConnected = false;
Message errorMessage = null;
try
{
/*
* Open a socket connection to the next candidate.
*/
int intPort = Integer.parseInt(port);
InetSocketAddress serverAddr = new InetSocketAddress(
InetAddress.getByName(hostname), intPort);
socket = new Socket();
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
socket.connect(serverAddr, timeoutMS);
localSession = replSessionSecurity.createClientSession(socket, timeoutMS);
boolean isSslEncryption = replSessionSecurity
.isSslEncryption(server);
// Send our ServerStartMsg.
StartMsg serverStartMsg;
if (!isECL)
{
serverStartMsg = new ServerStartMsg(serverId, baseDn,
maxRcvWindow, heartbeatInterval, state,
ProtocolVersion.getCurrentVersion(),
this.getGenerationID(), isSslEncryption, groupId);
}
else
{
serverStartMsg = new ServerStartECLMsg(baseDn, 0, 0, 0, 0,
maxRcvWindow, heartbeatInterval, state,
ProtocolVersion.getCurrentVersion(),
this.getGenerationID(), isSslEncryption, groupId);
}
localSession.publish(serverStartMsg);
// Read the ReplServerStartMsg or ReplServerStartDSMsg that should
// come back.
ReplicationMsg msg = localSession.receive();
if (debugEnabled())
{
debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
+ serverStartMsg.toString() + "\nAND RECEIVED:\n"
+ msg.toString());
}
// Wrap received message in a server info object
ReplicationServerInfo replServerInfo = ReplicationServerInfo
.newInstance(msg, server);
// Sanity check
String repDn = replServerInfo.getBaseDn();
if (!(this.baseDn.equals(repDn)))
{
errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn,
this.baseDn);
return null;
}
/*
* We have sent our own protocol version to the replication server. The
* replication server will use the same one (or an older one if it is an
* old replication server).
*/
final short localProtocolVersion = ProtocolVersion
.minWithCurrent(replServerInfo.getProtocolVersion());
if (keepConnection)
{
protocolVersion = localProtocolVersion;
}
localSession.setProtocolVersion(localProtocolVersion);
if (!isSslEncryption)
{
localSession.stopEncryption();
}
hasConnected = true;
// If this connection as the one to use for sending and receiving
// updates, store it.
if (keepConnection)
{
session = localSession;
}
return replServerInfo;
}
catch (ConnectException e)
{
errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
server, baseDn);
return null;
}
catch (SocketTimeoutException e)
{
errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(serverId,
server, baseDn);
return null;
}
catch (Exception e)
{
errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
server, baseDn, stackTraceToSingleLineString(e));
return null;
}
finally
{
if (!hasConnected || !keepConnection)
{
if (localSession != null)
{
localSession.close();
}
if (socket != null)
{
try
{
socket.close();
}
catch (IOException e)
{
// Ignore.
}
}
}
if (!hasConnected && errorMessage != null)
{
// There was no server waiting on this host:port Log a notice and try
// the next replicationServer in the list
if (!connectionError)
{
if (keepConnection) // Log error message only for final connection
{
// the error message is only logged once to avoid overflowing
// the error log
logError(errorMessage);
}
if (debugEnabled())
{
debugInfo(errorMessage.toString());
}
}
}
}
}
/**
* Performs the second phase handshake for External Change Log (send
* StartSessionMsg and receive TopologyMsg messages exchange) and return the
* reply message from the replication server.
*
* @param server Server we are connecting with.
* @param initStatus The status we are starting with
* @return The ReplServerStartMsg the server replied. Null if could not
* get an answer.
*/
private TopologyMsg performECLPhaseTwoHandshake(String server)
{
TopologyMsg topologyMsg = null;
try
{
// Send our Start Session
StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
startECLSessionMsg.setOperationId("-1");
session.publish(startECLSessionMsg);
/* FIXME:ECL In the handshake phase two, should RS send back a topo msg ?
* Read the TopologyMsg that should come back.
topologyMsg = (TopologyMsg) session.receive();
*/
if (debugEnabled())
{
debugInfo("In RB for " + baseDn +
"\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString());
// + "\nAND RECEIVED:\n" + topologyMsg.toString());
}
// Alright set the timeout to the desired value
session.setSoTimeout(timeout);
connected = true;
} catch (Exception e)
{
Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
server, baseDn, stackTraceToSingleLineString(e));
logError(message);
if (session != null)
{
session.close();
session = null;
}
// Be sure to return null.
topologyMsg = null;
}
return topologyMsg;
}
/**
* Performs the second phase handshake (send StartSessionMsg and receive
* TopologyMsg messages exchange) and return the reply message from the
* replication server.
*
* @param server Server we are connecting with.
* @param initStatus The status we are starting with
* @return The ReplServerStartMsg the server replied. Null if could not
* get an answer.
*/
private TopologyMsg performPhaseTwoHandshake(String server,
ServerStatus initStatus)
{
TopologyMsg topologyMsg;
try
{
/*
* Send our StartSessionMsg.
*/
StartSessionMsg startSessionMsg;
// May have created a broker with null replication domain for
// unit test purpose.
if (domain != null)
{
startSessionMsg =
new StartSessionMsg(
initStatus,
domain.getRefUrls(),
domain.isAssured(),
domain.getAssuredMode(),
domain.getAssuredSdLevel());
startSessionMsg.setEclIncludes(
domain.getEclIncludes(domain.getServerId()),
domain.getEclIncludesForDeletes(domain.getServerId()));
}
else
{
startSessionMsg =
new StartSessionMsg(initStatus, new ArrayList());
}
session.publish(startSessionMsg);
/*
* Read the TopologyMsg that should come back.
*/
topologyMsg = (TopologyMsg) session.receive();
if (debugEnabled())
{
debugInfo("In RB for " + baseDn +
"\nRB HANDSHAKE SENT:\n" + startSessionMsg.toString() +
"\nAND RECEIVED:\n" + topologyMsg.toString());
}
// Alright set the timeout to the desired value
session.setSoTimeout(timeout);
} catch (Exception e)
{
Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
server, baseDn, stackTraceToSingleLineString(e));
logError(message);
if (session != null)
{
session.close();
session = null;
}
// Be sure to return null.
topologyMsg = null;
}
return topologyMsg;
}
/**
* Returns the replication server that best fits our need so that we can
* connect to it or determine if we must disconnect from current one to
* re-connect to best server.
*
* Note: this method is static for test purpose (access from unit tests)
*
* @param firstConnection True if we run this method for the very first
* connection of the broker. False if we run this method to determine if the
* replication server we are currently connected to is still the best or not.
* @param rsServerId The id of the replication server we are currently
* connected to. Only used when firstConnection is false.
* @param myState The local server state.
* @param rsInfos The list of available replication servers and their
* associated information (choice will be made among them).
* @param localServerId The server id for the suffix we are working for.
* @param baseDn The suffix for which we are working for.
* @param groupId The groupId we prefer being connected to if possible
* @param generationId The generation id we are using
* @return The computed best replication server. If the returned value is
* null, the best replication server is undetermined but the local server must
* disconnect (so the best replication server is another one than the current
* one). Null can only be returned when firstConnection is false.
*/
public static ReplicationServerInfo computeBestReplicationServer(
boolean firstConnection, int rsServerId, ServerState myState,
Map rsInfos, int localServerId,
String baseDn, byte groupId, long generationId)
{
// Shortcut, if only one server, this is the best
if (rsInfos.size() == 1)
{
return rsInfos.values().iterator().next();
}
/**
* Apply some filtering criteria to determine the best servers list from
* the available ones. The ordered list of criteria is (from more important
* to less important):
* - replication server has the same group id as the local DS one
* - replication server has the same generation id as the local DS one
* - replication server is up to date regarding changes generated by the
* local DS
* - replication server in the same VM as local DS one
*/
Map bestServers = rsInfos;
// The list of best replication servers is filtered with each criteria. At
// each criteria, the list is replaced with the filtered one if there
// are some servers from the filtering, otherwise, the list is left as is
// and the new filtering for the next criteria is applied and so on.
// Use only servers locally configured: those are servers declared in
// the local configuration. When the current method is called, for
// sure, at least one server from the list is locally configured
bestServers =
keepBest(filterServersLocallyConfigured(bestServers), bestServers);
// Some servers with same group id ?
bestServers =
keepBest(filterServersWithSameGroupId(bestServers, groupId),
bestServers);
// Some servers with same generation id ?
Map sameGenerationId =
filterServersWithSameGenerationId(bestServers, generationId);
if (sameGenerationId.size() > 0)
{
// If some servers with the right generation id this is useful to
// run the local DS change criteria
bestServers =
keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
myState, localServerId), sameGenerationId);
}
// Some servers in the local VM ?
bestServers = keepBest(filterServersInSameVM(bestServers), bestServers);
/**
* Now apply the choice base on the weight to the best servers list
*/
if (bestServers.size() > 1)
{
if (firstConnection)
{
// We are no connected to a server yet
return computeBestServerForWeight(bestServers, -1, -1);
} else
{
// We are already connected to a RS: compute the best RS as far as the
// weights is concerned. If this is another one, some DS must
// disconnect.
return computeBestServerForWeight(bestServers, rsServerId,
localServerId);
}
} else
{
return bestServers.values().iterator().next();
}
}
/**
* If the filtered Map is not empty then it is returned, else return the
* original unfiltered Map.
*
* @return the best fit Map between the filtered Map and the original
* unfiltered Map.
*/
private static Map keepBest(Map filteredMap,
Map unfilteredMap)
{
if (!filteredMap.isEmpty())
{
return filteredMap;
}
return unfilteredMap;
}
/**
* Creates a new list that contains only replication servers that are locally
* configured.
* @param bestServers The list of replication servers to filter
* @return The sub list of replication servers locally configured
*/
private static Map
filterServersLocallyConfigured(Map bestServers)
{
Map result =
new HashMap();
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
if (replicationServerInfo.isLocallyConfigured())
{
result.put(rsId, replicationServerInfo);
}
}
return result;
}
/**
* Creates a new list that contains only replication servers that have the
* passed group id, from a passed replication server list.
* @param bestServers The list of replication servers to filter
* @param groupId The group id that must match
* @return The sub list of replication servers matching the requested group id
* (which may be empty)
*/
private static Map
filterServersWithSameGroupId(Map bestServers, byte groupId)
{
Map result =
new HashMap();
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
if (replicationServerInfo.getGroupId() == groupId)
{
result.put(rsId, replicationServerInfo);
}
}
return result;
}
/**
* Creates a new list that contains only replication servers that have the
* provided generation id, from a provided replication server list.
* When the selected replication servers have no change (empty serverState)
* then the 'empty'(generationId==-1) replication servers are also included
* in the result list.
*
* @param bestServers The list of replication servers to filter
* @param generationId The generation id that must match
* @return The sub list of replication servers matching the requested
* generation id (which may be empty)
*/
private static Map
filterServersWithSameGenerationId(Map bestServers, long generationId)
{
Map result =
new HashMap();
boolean emptyState = true;
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
if (replicationServerInfo.getGenerationId() == generationId)
{
result.put(rsId, replicationServerInfo);
if (!replicationServerInfo.serverState.isEmpty())
emptyState = false;
}
}
if (emptyState)
{
// If the RS with a generationId have all an empty state,
// then the 'empty'(genId=-1) RSes are also candidate
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
if (replicationServerInfo.getGenerationId() == -1)
{
result.put(rsId, replicationServerInfo);
}
}
}
return result;
}
/**
* Creates a new list that contains only replication servers that have the
* latest changes from the passed DS, from a passed replication server list.
* @param bestServers The list of replication servers to filter
* @param localState The state of the local DS
* @param localServerId The server id to consider for the changes
* @return The sub list of replication servers that have the latest changes
* from the passed DS (which may be empty)
*/
private static Map
filterServersWithAllLocalDSChanges(Map bestServers, ServerState localState,
int localServerId)
{
Map upToDateServers =
new HashMap();
Map moreUpToDateServers =
new HashMap();
// Extract the change number of the latest change generated by the local
// server
ChangeNumber myChangeNumber = localState.getMaxChangeNumber(localServerId);
if (myChangeNumber == null)
{
myChangeNumber = new ChangeNumber(0, 0, localServerId);
}
/**
* Find replication servers who are up to date (or more up to date than us,
* if for instance we failed and restarted, having sent some changes to the
* RS but without having time to store our own state) regarding our own
* server id. If some servers more up to date, prefer this list but take
* only the latest change number.
*/
ChangeNumber latestRsChangeNumber = null;
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
ServerState rsState = replicationServerInfo.getServerState();
ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerId);
if (rsChangeNumber == null)
{
rsChangeNumber = new ChangeNumber(0, 0, localServerId);
}
// Has this replication server the latest local change ?
if (myChangeNumber.olderOrEqual(rsChangeNumber))
{
if (myChangeNumber.equals(rsChangeNumber))
{
// This replication server has exactly the latest change from the
// local server
upToDateServers.put(rsId, replicationServerInfo);
} else
{
// This replication server is even more up to date than the local
// server
if (latestRsChangeNumber == null)
{
// Initialize the latest change number
latestRsChangeNumber = rsChangeNumber;
}
if (rsChangeNumber.newerOrEquals(latestRsChangeNumber))
{
if (rsChangeNumber.equals(latestRsChangeNumber))
{
moreUpToDateServers.put(rsId, replicationServerInfo);
} else
{
// This RS is even more up to date, clear the list and store this
// new RS
moreUpToDateServers.clear();
moreUpToDateServers.put(rsId, replicationServerInfo);
latestRsChangeNumber = rsChangeNumber;
}
}
}
}
}
if (moreUpToDateServers.size() > 0)
{
// Prefer servers more up to date than local server
return moreUpToDateServers;
} else
{
return upToDateServers;
}
}
/**
* Creates a new list that contains only replication servers that are in the
* same VM as the local DS, from a passed replication server list.
* @param bestServers The list of replication servers to filter
* @return The sub list of replication servers being in the same VM as the
* local DS (which may be empty)
*/
private static Map filterServersInSameVM(
Map bestServers)
{
Map result =
new HashMap();
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
if (ReplicationServer.isLocalReplicationServer(
replicationServerInfo.getServerURL()))
{
result.put(rsId, replicationServerInfo);
}
}
return result;
}
/**
* Computes the best replication server the local server should be connected
* to so that the load is correctly spread across the topology, following the
* weights guidance.
* Warning: This method is expected to be called with at least 2 servers in
* bestServers
* Note: this method is static for test purpose (access from unit tests)
* @param bestServers The list of replication servers to consider
* @param currentRsServerId The replication server the local server is
* currently connected to. -1 if the local server is not yet connected
* to any replication server.
* @param localServerId The server id of the local server. This is not used
* when it is not connected to a replication server
* (currentRsServerId = -1)
* @return The replication server the local server should be connected to
* as far as the weight is concerned. This may be the currently used one if
* the weight is correctly spread. If the returned value is null, the best
* replication server is undetermined but the local server must disconnect
* (so the best replication server is another one than the current one).
*/
public static ReplicationServerInfo computeBestServerForWeight(
Map bestServers, int currentRsServerId,
int localServerId)
{
/*
* - Compute the load goal of each RS, deducing it from the weights affected
* to them.
* - Compute the current load of each RS, deducing it from the DSs
* currently connected to them.
* - Compute the differences between the load goals and the current loads of
* the RSs.
*/
// Sum of the weights
int sumOfWeights = 0;
// Sum of the connected DSs
int sumOfConnectedDSs = 0;
for (ReplicationServerInfo replicationServerInfo : bestServers.values())
{
sumOfWeights += replicationServerInfo.getWeight();
sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
}
// Distance (difference) of the current loads to the load goals of each RS:
// key:server id, value: distance
Map loadDistances = new HashMap();
// Precision for the operations (number of digits after the dot)
MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
for (Integer rsId : bestServers.keySet())
{
ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
int rsWeight = replicationServerInfo.getWeight();
// load goal = rs weight / sum of weights
BigDecimal loadGoalBd = BigDecimal.valueOf(rsWeight).divide(
BigDecimal.valueOf(sumOfWeights), mathContext);
BigDecimal currentLoadBd = BigDecimal.ZERO;
if (sumOfConnectedDSs != 0)
{
// current load = number of connected DSs / total number of DSs
int connectedDSs = replicationServerInfo.getConnectedDSNumber();
currentLoadBd = BigDecimal.valueOf(connectedDSs).divide(
BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
}
// load distance = load goal - current load
BigDecimal loadDistanceBd =
loadGoalBd.subtract(currentLoadBd, mathContext);
loadDistances.put(rsId, loadDistanceBd);
}
if (currentRsServerId == -1)
{
// The local server is not connected yet
/*
* Find the server with the current highest distance to its load goal and
* choose it. Make an exception if every server is correctly balanced,
* that is every current load distances are equal to 0, in that case,
* choose the server with the highest weight
*/
int bestRsId = 0; // If all server equal, return the first one
float highestDistance = Float.NEGATIVE_INFINITY;
boolean allRsWithZeroDistance = true;
int highestWeightRsId = -1;
int highestWeight = -1;
for (Integer rsId : bestServers.keySet())
{
float loadDistance = loadDistances.get(rsId).floatValue();
if (loadDistance > highestDistance)
{
// This server is far more from its balance point
bestRsId = rsId;
highestDistance = loadDistance;
}
if (loadDistance != 0)
{
allRsWithZeroDistance = false;
}
int weight = bestServers.get(rsId).getWeight();
if (weight > highestWeight)
{
// This server has a higher weight
highestWeightRsId = rsId;
highestWeight = weight;
}
}
// All servers with a 0 distance ?
if (allRsWithZeroDistance)
{
// Choose server with the highest weight
bestRsId = highestWeightRsId;
}
return bestServers.get(bestRsId);
} else
{
// The local server is currently connected to a RS, let's see if it must
// disconnect or not, taking the weights into account.
float currentLoadDistance =
loadDistances.get(currentRsServerId).floatValue();
if (currentLoadDistance < 0)
{
// Too much DSs connected to the current RS, compared with its load
// goal:
// Determine the potential number of DSs to disconnect from the current
// RS and see if the local DS is part of them: the DSs that must
// disconnect are those with the lowest server id.
// Compute the sum of the distances of the load goals of the other RSs
BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
for (Integer rsId : bestServers.keySet())
{
if (rsId != currentRsServerId)
{
sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
loadDistances.get(rsId), mathContext);
}
}
if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
{
// The average distance of the other RSs shows a lack of DSs.
// Compute the number of DSs to disconnect from the current RS,
// rounding to the nearest integer number. Do only this if there is
// no risk of yoyo effect: when the exact balance cannot be
// established due to the current number of DSs connected, do not
// disconnect a DS. A simple example where the balance cannot be
// reached is:
// - RS1 has weight 1 and 2 DSs
// - RS2 has weight 1 and 1 DS
// => disconnecting a DS from RS1 to reconnect it to RS2 would have no
// sense as this would lead to the reverse situation. In that case,
// the perfect balance cannot be reached and we must stick to the
// current situation, otherwise the DS would keep move between the 2
// RSs
float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
.floatValue();
int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
// Avoid yoyo effect
if (overloadingDSsNumber == 1)
{
// What would be the new load distance for the current RS if
// we disconnect some DSs ?
ReplicationServerInfo currentReplicationServerInfo =
bestServers.get(currentRsServerId);
int currentRsWeight = currentReplicationServerInfo.getWeight();
BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
BigDecimal currentRsLoadGoalBd =
currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
if (sumOfConnectedDSs != 0)
{
int connectedDSs = currentReplicationServerInfo.
getConnectedDSNumber();
BigDecimal potentialNewConnectedDSsBd =
BigDecimal.valueOf(connectedDSs - 1);
BigDecimal sumOfConnectedDSsBd =
BigDecimal.valueOf(sumOfConnectedDSs);
potentialCurrentRsNewLoadBd =
potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
mathContext);
}
BigDecimal potentialCurrentRsNewLoadDistanceBd =
currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
mathContext);
// What would be the new load distance for the other RSs ?
BigDecimal additionalDsLoadBd =
BigDecimal.ONE.divide(
BigDecimal.valueOf(sumOfConnectedDSs),mathContext);
BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
mathContext);
// Now compare both values: we must no disconnect the DS if this
// is for going in a situation where the load distance of the other
// RSs is the opposite of the future load distance of the local RS
// or we would evaluate that we should disconnect just after being
// arrived on the new RS. But we should disconnect if we reach the
// perfect balance (both values are 0).
MathContext roundMc =
new MathContext(6, RoundingMode.DOWN);
BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
potentialCurrentRsNewLoadDistanceBd.round(roundMc);
BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
BigDecimal.ZERO) != 0)
&& (potentialCurrentRsNewLoadDistanceBdRounded.equals(
potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
{
// Avoid the yoyo effect, and keep the local DS connected to its
// current RS
return bestServers.get(currentRsServerId);
}
}
// Prepare a sorted list (from lowest to highest) or DS server ids
// connected to the current RS
ReplicationServerInfo currentRsInfo =
bestServers.get(currentRsServerId);
List serversConnectedToCurrentRS =
currentRsInfo.getConnectedDSs();
List sortedServers = new ArrayList(
serversConnectedToCurrentRS);
Collections.sort(sortedServers);
// Go through the list of DSs to disconnect and see if the local
// server is part of them.
int index = 0;
while (overloadingDSsNumber > 0)
{
int severToDisconnectId = sortedServers.get(index);
if (severToDisconnectId == localServerId)
{
// The local server is part of the DSs to disconnect
return null;
}
overloadingDSsNumber--;
index++;
}
// The local server is not part of the servers to disconnect from the
// current RS.
return bestServers.get(currentRsServerId);
} else
{
// The average distance of the other RSs does not show a lack of DSs:
// no need to disconnect any DS from the current RS.
return bestServers.get(currentRsServerId);
}
} else
{
// The RS load goal is reached or there are not enough DSs connected to
// it to reach it: do not disconnect from this RS and return rsInfo for
// this RS
return bestServers.get(currentRsServerId);
}
}
}
/**
* Start the heartbeat monitor thread.
*/
private void startRSHeartBeatMonitoring()
{
// Start a heartbeat monitor thread.
if (heartbeatInterval > 0)
{
heartbeatMonitor = new HeartbeatMonitor(getServerId(),
getRsServerId(), baseDn, session, heartbeatInterval);
heartbeatMonitor.start();
}
}
/**
* Stop the heartbeat monitor thread.
*/
synchronized void stopRSHeartBeatMonitoring()
{
if (heartbeatMonitor != null)
{
heartbeatMonitor.shutdown();
heartbeatMonitor = null;
}
}
/**
* restart the ReplicationBroker.
* @param infiniteTry the socket which failed
*/
public void reStart(boolean infiniteTry)
{
reStart(session, infiniteTry);
}
/**
* Restart the ReplicationServer broker after a failure.
*
* @param failingSession the socket which failed
* @param infiniteTry the socket which failed
*/
public void reStart(ProtocolSession failingSession, boolean infiniteTry)
{
if (failingSession != null)
{
failingSession.close();
numLostConnections++;
}
if (failingSession == session)
{
connected = false;
rsGroupId = -1;
rsServerId = -1;
rsServerUrl = null;
session = null;
}
while (true)
{
// Synchronize inside the loop in order to allow shutdown.
boolean needSleep = false;
synchronized (startStopLock)
{
if (connected || shutdown)
{
break;
}
try
{
connect();
}
catch (Exception e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(baseDn,
e.getLocalizedMessage()));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
}
if (connected || !infiniteTry)
{
break;
}
needSleep = true;
}
if (needSleep)
{
try
{
Thread.sleep(500);
}
catch (InterruptedException e)
{
// ignore
}
}
}
if (debugEnabled())
{
debugInfo(this + " end restart : connected=" + connected + " with RSid="
+ this.getRsServerId() + " genid=" + this.generationID);
}
}
/**
* Publish a message to the other servers.
* @param msg the message to publish
*/
public void publish(ReplicationMsg msg)
{
_publish(msg, false, true);
}
/**
* Publish a message to the other servers.
* @param msg The message to publish.
* @param retryOnFailure Whether reconnect should automatically be done.
* @return Whether publish succeeded.
*/
public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
{
return _publish(msg, false, retryOnFailure);
}
/**
* Publish a recovery message to the other servers.
* @param msg the message to publish
*/
public void publishRecovery(ReplicationMsg msg)
{
_publish(msg, true, true);
}
/**
* Publish a message to the other servers.
* @param msg the message to publish
* @param recoveryMsg the message is a recovery Message
* @param retryOnFailure whether retry should be done on failure
* @return whether the message was successfully sent.
*/
boolean _publish(ReplicationMsg msg, boolean recoveryMsg,
boolean retryOnFailure)
{
boolean done = false;
while (!done && !shutdown)
{
if (connectionError)
{
// It was not possible to connect to any replication server.
// Since the operation was already processed, we have no other
// choice than to return without sending the ReplicationMsg
// and relying on the resend procedure of the connect phase to
// fix the problem when we finally connect.
if (debugEnabled())
{
debugInfo("ReplicationBroker.publish() Publishing a " +
"message is not possible due to existing connection error.");
}
return false;
}
try
{
boolean credit;
ProtocolSession current_session;
Semaphore currentWindowSemaphore;
// save the session at the time when we acquire the
// sendwindow credit so that we can make sure later
// that the session did not change in between.
// This is necessary to make sure that we don't publish a message
// on a session with a credit that was acquired from a previous
// session.
synchronized (connectPhaseLock)
{
current_session = session;
currentWindowSemaphore = sendWindow;
}
// If the Replication domain has decided that there is a need to
// recover some changes then it is not allowed to send this
// change but it will be the responsibility of the recovery thread to
// do it.
if (!recoveryMsg & connectRequiresRecovery)
{
return false;
}
if (msg instanceof UpdateMsg)
{
// Acquiring the window credit must be done outside of the
// connectPhaseLock because it can be blocking and we don't
// want to hold off reconnection in case the connection dropped.
credit =
currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
} else
{
credit = true;
}
if (credit)
{
synchronized (connectPhaseLock)
{
// session may have been set to null in the connection phase
// when restarting the broker for example.
// check the session. If it has changed, some
// deconnection/reconnection happened and we need to restart from
// scratch.
if ((session != null) &&
(session == current_session))
{
session.publish(msg);
done = true;
}
}
}
if ((!credit) && (currentWindowSemaphore.availablePermits() == 0))
{
synchronized (connectPhaseLock)
{
// the window is still closed.
// Send a WindowProbeMsg message to wakeup the receiver in case the
// window update message was lost somehow...
// then loop to check again if connection was closed.
if (session != null) {
session.publish(new WindowProbeMsg());
}
}
}
} catch (IOException e)
{
if (!retryOnFailure)
return false;
// The receive threads should handle reconnection or
// mark this broker in error. Just retry.
synchronized (connectPhaseLock)
{
try
{
connectPhaseLock.wait(100);
} catch (InterruptedException e1)
{
// ignore
if (debugEnabled())
{
debugInfo("ReplicationBroker.publish() " +
"Interrupted exception raised : " + e.getLocalizedMessage());
}
}
}
} catch (InterruptedException e)
{
// just loop.
if (debugEnabled())
{
debugInfo("ReplicationBroker.publish() " +
"Interrupted exception raised." + e.getLocalizedMessage());
}
}
}
return true;
}
/**
* Receive a message.
* This method is not multithread safe and should either always be
* called in a single thread or protected by a locking mechanism
* before being called. This is a wrapper to the method with a boolean version
* so that we do not have to modify existing tests.
*
* @return the received message
* @throws SocketTimeoutException if the timeout set by setSoTimeout
* has expired
*/
public ReplicationMsg receive() throws SocketTimeoutException
{
return receive(false, true, false);
}
/**
* Receive a message.
* This method is not multithread safe and should either always be
* called in a single thread or protected by a locking mechanism
* before being called.
*
* @param reconnectToTheBestRS Whether broker will automatically switch
* to the best suitable RS.
* @param reconnectOnFailure Whether broker will automatically reconnect
* on failure.
* @param returnOnTopoChange Whether broker should return TopologyMsg
* received.
* @return the received message
*
* @throws SocketTimeoutException if the timeout set by setSoTimeout
* has expired
*/
public ReplicationMsg receive(boolean reconnectToTheBestRS,
boolean reconnectOnFailure, boolean returnOnTopoChange)
throws SocketTimeoutException
{
while (!shutdown)
{
if (reconnectOnFailure && !connected)
{
// infinite try to reconnect
reStart(null, true);
}
// Save session information for later in case we need it for log messages
// after the session has been closed and/or failed.
final ProtocolSession savedSession = session;
if (savedSession == null)
{
// Must be shutting down.
break;
}
final int replicationServerID = rsServerId;
try
{
ReplicationMsg msg = savedSession.receive();
if (msg instanceof UpdateMsg)
{
synchronized (this)
{
rcvWindow--;
}
}
if (msg instanceof WindowMsg)
{
WindowMsg windowMsg = (WindowMsg) msg;
sendWindow.release(windowMsg.getNumAck());
}
else if (msg instanceof TopologyMsg)
{
TopologyMsg topoMsg = (TopologyMsg) msg;
receiveTopo(topoMsg);
if (reconnectToTheBestRS)
{
// Reset wait time before next computation of best server
mustRunBestServerCheckingAlgorithm = 0;
}
// Caller wants to check what's changed
if (returnOnTopoChange)
return msg;
}
else if (msg instanceof StopMsg)
{
/*
* RS performs a proper disconnection
*/
Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED
.get(replicationServerID,
savedSession.getReadableRemoteAddress(),
serverId, baseDn);
logError(message);
// Try to find a suitable RS
this.reStart(savedSession, true);
}
else if (msg instanceof MonitorMsg)
{
// This is the response to a MonitorRequest that was sent earlier or
// the regular message of the monitoring publisher of the RS.
MonitorMsg monitorMsg = (MonitorMsg) msg;
// Extract and store replicas ServerStates
replicaStates = new HashMap();
for (int srvId : toIterable(monitorMsg.ldapIterator()))
{
replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
}
// Notify the sender that the response was received.
synchronized (monitorResponse)
{
monitorResponse.set(true);
monitorResponse.notify();
}
// Update the replication servers ServerStates with new received info
for (int srvId : toIterable(monitorMsg.rsIterator()))
{
ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
if (rsInfo != null)
{
rsInfo.update(monitorMsg.getRSServerState(srvId));
}
}
// Now if it is allowed, compute the best replication server to see if
// it is still the one we are currently connected to. If not,
// disconnect properly and let the connection algorithm re-connect to
// best replication server
if (reconnectToTheBestRS)
{
mustRunBestServerCheckingAlgorithm++;
if (mustRunBestServerCheckingAlgorithm == 2)
{
// Stable topology (no topo msg since few seconds): proceed with
// best server checking.
ReplicationServerInfo bestServerInfo =
computeBestReplicationServer(false, rsServerId, state,
replicationServerInfos, serverId, baseDn, groupId,
generationID);
if ((rsServerId != -1) && ((bestServerInfo == null) ||
(bestServerInfo.getServerId() != rsServerId)))
{
// The best replication server is no more the one we are
// currently using. Disconnect properly then reconnect.
Message message;
if (bestServerInfo == null)
{
message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
serverId, replicationServerID,
savedSession.getReadableRemoteAddress(),
baseDn);
}
else
{
message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
serverId, replicationServerID,
savedSession.getReadableRemoteAddress(),
bestServerInfo.getServerId(), baseDn);
}
logError(message);
reStart(true);
}
// Reset wait time before next computation of best server
mustRunBestServerCheckingAlgorithm = 0;
}
}
}
else
{
return msg;
}
}
catch (SocketTimeoutException e)
{
throw e;
}
catch (Exception e)
{
if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
if (!shutdown)
{
final ProtocolSession tmpSession = session;
if (tmpSession == null || !tmpSession.closeInitiated())
{
/*
* We did not initiate the close on our side, log an error message.
*/
Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED
.get(serverId, baseDn, replicationServerID,
savedSession.getReadableRemoteAddress());
logError(message);
}
if (reconnectOnFailure)
{
reStart(savedSession, true);
}
else
{
break; // does not seem necessary to explicitly disconnect ..
}
}
}
} // while !shutdown
return null;
}
/**
* Gets the States of all the Replicas currently in the
* Topology.
* When this method is called, a Monitoring message will be sent
* to the Replication Server to which this domain is currently connected
* so that it computes a table containing information about
* all Directory Servers in the topology.
* This Computation involves communications will all the servers
* currently connected and
*
* @return The States of all Replicas in the topology (except us)
*/
public Map getReplicaStates()
{
monitorResponse.set(false);
// publish Monitor Request Message to the Replication Server
publish(new MonitorRequestMsg(serverId, getRsServerId()));
// wait for Response up to 10 seconds.
try
{
synchronized (monitorResponse)
{
if (monitorResponse.get() == false)
{
monitorResponse.wait(10000);
}
}
} catch (InterruptedException e)
{
}
return replicaStates;
}
/**
* This method allows to do the necessary computing for the window
* management after treatment by the worker threads.
*
* This should be called once the replay thread have done their job
* and the window can be open again.
*/
public synchronized void updateWindowAfterReplay()
{
try
{
updateDoneCount++;
if ((updateDoneCount >= halfRcvWindow) && (session != null))
{
session.publish(new WindowMsg(updateDoneCount));
rcvWindow += updateDoneCount;
updateDoneCount = 0;
}
} catch (IOException e)
{
// Any error on the socket will be handled by the thread calling receive()
// just ignore.
}
}
/**
* stop the server.
*/
public void stop()
{
if (debugEnabled())
debugInfo("ReplicationBroker " + serverId + " is stopping and will" +
" close the connection to replication server " + rsServerId + " for" +
" domain " + baseDn);
synchronized (startStopLock)
{
shutdown = true;
connected = false;
stopRSHeartBeatMonitoring();
stopChangeTimeHeartBeatPublishing();
replicationServer = "stopped";
rsGroupId = -1;
rsServerId = -1;
rsServerUrl = null;
if (session != null)
{
session.close();
}
}
}
/**
* Set a timeout value.
* With this option set to a non-zero value, calls to the receive() method
* block for only this amount of time after which a
* java.net.SocketTimeoutException is raised.
* The Broker is valid and usable even after such an Exception is raised.
*
* @param timeout the specified timeout, in milliseconds.
* @throws SocketException if there is an error in the underlying protocol,
* such as a TCP error.
*/
public void setSoTimeout(int timeout) throws SocketException
{
this.timeout = timeout;
if (session != null)
{
session.setSoTimeout(timeout);
}
}
/**
* Get the name of the replicationServer to which this broker is currently
* connected.
*
* @return the name of the replicationServer to which this domain
* is currently connected.
*/
public String getReplicationServer()
{
return replicationServer;
}
/**
* Get the maximum receive window size.
*
* @return The maximum receive window size.
*/
public int getMaxRcvWindow()
{
return maxRcvWindow;
}
/**
* Get the current receive window size.
*
* @return The current receive window size.
*/
public int getCurrentRcvWindow()
{
return rcvWindow;
}
/**
* Get the maximum send window size.
*
* @return The maximum send window size.
*/
public int getMaxSendWindow()
{
return maxSendWindow;
}
/**
* Get the current send window size.
*
* @return The current send window size.
*/
public int getCurrentSendWindow()
{
if (connected)
{
return sendWindow.availablePermits();
} else
{
return 0;
}
}
/**
* Get the number of times the connection was lost.
* @return The number of times the connection was lost.
*/
public int getNumLostConnections()
{
return numLostConnections;
}
/**
* Change some configuration parameters.
*
* @param replicationServers The new list of replication servers.
* @param window The max window size.
* @param heartbeatInterval The heartBeat interval.
*
* @return A boolean indicating if the changes
* requires to restart the service.
* @param groupId The new group id to use
*/
public boolean changeConfig(
Collection replicationServers, int window, long heartbeatInterval,
byte groupId)
{
// These parameters needs to be renegotiated with the ReplicationServer
// so if they have changed, that requires restarting the session with
// the ReplicationServer.
Boolean needToRestartSession = false;
// A new session is necessary only when information regarding
// the connection is modified
if (this.replicationServerUrls == null
|| replicationServers.size() != this.replicationServerUrls.size()
|| !replicationServers.containsAll(this.replicationServerUrls)
|| window != this.maxRcvWindow
|| heartbeatInterval != this.heartbeatInterval
|| groupId != this.groupId)
{
needToRestartSession = true;
}
this.replicationServerUrls = replicationServers;
this.rcvWindow = window;
this.maxRcvWindow = window;
this.halfRcvWindow = window / 2;
this.heartbeatInterval = heartbeatInterval;
this.groupId = groupId;
return needToRestartSession;
}
/**
* Get the version of the replication protocol.
* @return The version of the replication protocol.
*/
public short getProtocolVersion()
{
return protocolVersion;
}
/**
* Check if the broker is connected to a ReplicationServer and therefore
* ready to received and send Replication Messages.
*
* @return true if the server is connected, false if not.
*/
public boolean isConnected()
{
return connected;
}
private boolean debugEnabled()
{
return false;
}
private static void debugInfo(String s)
{
logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
TRACER.debugInfo(s);
}
/**
* Determine whether the connection to the replication server is encrypted.
* @return true if the connection is encrypted, false otherwise.
*/
public boolean isSessionEncrypted()
{
boolean isEncrypted = false;
if (session != null)
{
return session.isEncrypted();
}
return isEncrypted;
}
/**
* Signals the RS we just entered a new status.
* @param newStatus The status the local DS just entered
*/
public void signalStatusChange(ServerStatus newStatus)
{
try
{
ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS,
newStatus);
session.publish(csMsg);
} catch (IOException ex)
{
Message message = ERR_EXCEPTION_SENDING_CS.get(
baseDn,
Integer.toString(serverId),
ex.getLocalizedMessage() + stackTraceToSingleLineString(ex));
logError(message);
}
}
/**
* Sets the group id of the broker.
* @param groupId The new group id.
*/
public void setGroupId(byte groupId)
{
this.groupId = groupId;
}
/**
* Gets the info for DSs in the topology (except us).
* @return The info for DSs in the topology (except us)
*/
public List getDsList()
{
return dsList;
}
/**
* Gets the info for RSs in the topology (except the one we are connected
* to).
* @return The info for RSs in the topology (except the one we are connected
* to)
*/
public List getRsList()
{
List result = new ArrayList();
for (ReplicationServerInfo replicationServerInfo :
replicationServerInfos.values())
{
result.add(replicationServerInfo.toRSInfo());
}
return result;
}
/**
* Computes the list of DSs connected to a particular RS.
* @param rsId The RS id of the server one wants to know the connected DSs
* @param dsList The list of DSinfo from which to compute things
* @return The list of connected DSs to the server rsId
*/
private List computeConnectedDSs(int rsId, List dsList)
{
List connectedDSs = new ArrayList();
if (rsServerId == rsId)
{
// If we are computing connected DSs for the RS we are connected
// to, we should count the local DS as the DSInfo of the local DS is not
// sent by the replication server in the topology message. We must count
// ourself as a connected server.
connectedDSs.add(serverId);
}
for (DSInfo dsInfo : dsList)
{
if (dsInfo.getRsId() == rsId)
connectedDSs.add(dsInfo.getDsId());
}
return connectedDSs;
}
/**
* Processes an incoming TopologyMsg.
* Updates the structures for the local view of the topology.
*
* @param topoMsg The topology information received from RS.
*/
public void receiveTopo(TopologyMsg topoMsg)
{
if (debugEnabled())
debugInfo(this + " receive TopologyMsg=" + topoMsg);
// Store new DS list
dsList = topoMsg.getDsList();
// Update replication server info list with the received topology
// information
List rsToKeepList = new ArrayList();
for (RSInfo rsInfo : topoMsg.getRsList())
{
int rsId = rsInfo.getId();
rsToKeepList.add(rsId); // Mark this server as still existing
List connectedDSs = computeConnectedDSs(rsId, dsList);
ReplicationServerInfo replicationServerInfo =
replicationServerInfos.get(rsId);
if (replicationServerInfo == null)
{
// New replication server, create info for it add it to the list
replicationServerInfo =
new ReplicationServerInfo(rsInfo, connectedDSs);
// Set the locally configured flag for this new RS only if it is
// configured
updateRSInfoLocallyConfiguredStatus(replicationServerInfo);
replicationServerInfos.put(rsId, replicationServerInfo);
} else
{
// Update the existing info for the replication server
replicationServerInfo.update(rsInfo, connectedDSs);
}
}
/**
* Now remove any replication server that may have disappeared from the
* topology.
*/
Iterator> rsInfoIt =
replicationServerInfos.entrySet().iterator();
while (rsInfoIt.hasNext())
{
Entry rsInfoEntry = rsInfoIt.next();
if (!rsToKeepList.contains(rsInfoEntry.getKey()))
{
// This replication server has quit the topology, remove it from the
// list
rsInfoIt.remove();
}
}
if (domain != null)
{
for (DSInfo info : dsList)
{
domain.setEclIncludes(info.getDsId(), info.getEclIncludes(),
info.getEclIncludesForDeletes());
}
}
}
/**
* Check if the broker could not find any Replication Server and therefore
* connection attempt failed.
*
* @return true if the server could not connect to any Replication Server.
*/
public boolean hasConnectionError()
{
return connectionError;
}
/**
* Starts publishing to the RS the current timestamp used in this server.
*/
public void startChangeTimeHeartBeatPublishing()
{
// Start a CN heartbeat thread.
if (changeTimeHeartbeatSendInterval > 0)
{
String threadName = "Replica DS("
+ this.getServerId()
+ ") change time heartbeat publisher for domain \""
+ this.baseDn + "\" to RS(" + this.getRsServerId()
+ ") at " + session.getReadableRemoteAddress();
ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
threadName, session, changeTimeHeartbeatSendInterval,
serverId);
ctHeartbeatPublisherThread.start();
} else
{
if (debugEnabled())
debugInfo(this +
" is not configured to send CN heartbeat interval");
}
}
/**
* Stops publishing to the RS the current timestamp used in this server.
*/
public synchronized void stopChangeTimeHeartBeatPublishing()
{
if (ctHeartbeatPublisherThread != null)
{
ctHeartbeatPublisherThread.shutdown();
ctHeartbeatPublisherThread = null;
}
}
/**
* Set a new change time heartbeat interval to this broker.
* @param changeTimeHeartbeatInterval The new interval (in ms).
*/
public void setChangeTimeHeartbeatInterval(int changeTimeHeartbeatInterval)
{
stopChangeTimeHeartBeatPublishing();
this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval;
startChangeTimeHeartBeatPublishing();
}
/**
* Set the connectRequiresRecovery to the provided value.
* This flag is used to indicate if a recovery of Update is necessary
* after a reconnection to a RS.
* It is the responsibility of the ReplicationDomain to set it during the
* sessionInitiated phase.
*
* @param b the new value of the connectRequiresRecovery.
*/
public void setRecoveryRequired(boolean b)
{
connectRequiresRecovery = b;
}
/**
* Returns whether the broker is shutting down.
* @return whether the broker is shutting down.
*/
public boolean shuttingDown()
{
return shutdown;
}
}