/*
|
* CDDL HEADER START
|
*
|
* The contents of this file are subject to the terms of the
|
* Common Development and Distribution License, Version 1.0 only
|
* (the "License"). You may not use this file except in compliance
|
* with the License.
|
*
|
* You can obtain a copy of the license at
|
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
|
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
|
* See the License for the specific language governing permissions
|
* and limitations under the License.
|
*
|
* When distributing Covered Code, include this CDDL HEADER in each
|
* file and include the License file at
|
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
|
* add the following below this CDDL HEADER, with the fields enclosed
|
* by brackets "[]" replaced with your own identifying information:
|
* Portions Copyright [yyyy] [name of copyright owner]
|
*
|
* CDDL HEADER END
|
*
|
*
|
* Copyright 2006-2010 Sun Microsystems, Inc.
|
* Portions Copyright 2011-2013 ForgeRock AS
|
*/
|
package org.opends.server.replication.service;
|
|
import java.io.IOException;
|
import java.math.BigDecimal;
|
import java.math.MathContext;
|
import java.math.RoundingMode;
|
import java.net.ConnectException;
|
import java.net.Socket;
|
import java.net.SocketException;
|
import java.net.SocketTimeoutException;
|
import java.util.*;
|
import java.util.Map.Entry;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.TimeUnit;
|
|
import org.opends.messages.Message;
|
import org.opends.messages.MessageBuilder;
|
import org.opends.server.admin.std.server.ReplicationDomainCfg;
|
import org.opends.server.core.DirectoryServer;
|
import org.opends.server.loggers.debug.DebugTracer;
|
import org.opends.server.replication.common.*;
|
import org.opends.server.replication.plugin.MultimasterReplication;
|
import org.opends.server.replication.protocol.*;
|
import org.opends.server.types.DN;
|
import org.opends.server.types.DebugLogLevel;
|
import org.opends.server.types.HostPort;
|
import org.opends.server.util.ServerConstants;
|
|
import static org.opends.messages.ReplicationMessages.*;
|
import static org.opends.server.loggers.ErrorLogger.*;
|
import static org.opends.server.loggers.debug.DebugLogger.*;
|
import static org.opends.server.replication.protocol.ProtocolVersion.*;
|
import static org.opends.server.replication.server.ReplicationServer.*;
|
import static org.opends.server.util.StaticUtils.*;
|
|
/**
|
* The broker for Multi-master Replication.
|
*/
|
public class ReplicationBroker
|
{
|
|
/**
|
* The tracer object for the debug logger.
|
*/
|
private static final DebugTracer TRACER = getTracer();
|
private volatile boolean shutdown = false;
|
private final Object startStopLock = new Object();
|
private volatile ReplicationDomainCfg config;
|
private volatile boolean connected = false;
|
/**
|
* String reported under CSN=monitor when there is no connected RS.
|
*/
|
public final static String NO_CONNECTED_SERVER = "Not connected";
|
private volatile String replicationServer = NO_CONNECTED_SERVER;
|
private volatile Session session;
|
private final ServerState state;
|
private Semaphore sendWindow;
|
private int maxSendWindow;
|
private int rcvWindow = 100;
|
private int halfRcvWindow = rcvWindow / 2;
|
private int timeout = 0;
|
private short protocolVersion;
|
private ReplSessionSecurity replSessionSecurity;
|
/** The group id of the RS we are connected to. */
|
private byte rsGroupId = -1;
|
/** The server id of the RS we are connected to. */
|
private int rsServerId = -1;
|
/** The server URL of the RS we are connected to. */
|
private String rsServerUrl;
|
/** Our replication domain. */
|
private ReplicationDomain domain;
|
/**
|
* This object is used as a conditional event to be notified about
|
* the reception of monitor information from the Replication Server.
|
*/
|
private final MutableBoolean monitorResponse = new MutableBoolean(false);
|
/**
|
* A Map containing the ServerStates of all the replicas in the topology
|
* as seen by the ReplicationServer the last time it was polled or the last
|
* time it published monitoring information.
|
*/
|
private Map<Integer, ServerState> replicaStates =
|
new HashMap<Integer, ServerState>();
|
/**
|
* A thread to monitor heartbeats on the session.
|
*/
|
private HeartbeatMonitor heartbeatMonitor;
|
/**
|
* The number of times the connection was lost.
|
*/
|
private int numLostConnections = 0;
|
/**
|
* When the broker cannot connect to any replication server
|
* it log an error and keeps continuing every second.
|
* This boolean is set when the first failure happens and is used
|
* to avoid repeating the error message for further failure to connect
|
* and to know that it is necessary to print a new message when the broker
|
* finally succeed to connect.
|
*/
|
private volatile boolean connectionError = false;
|
private final Object connectPhaseLock = new Object();
|
/**
|
* The thread that publishes messages to the RS containing the current
|
* change time of this DS.
|
*/
|
private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
|
/*
|
* Properties for the last topology info received from the network.
|
*/
|
/**
|
* Info for other DSs.
|
* <p>
|
* Warning: does not contain info for us (for our server id)
|
*/
|
private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
|
private volatile long generationID;
|
private volatile int updateDoneCount = 0;
|
private volatile boolean connectRequiresRecovery = false;
|
|
/**
|
* The map of replication server info initialized at connection time and
|
* regularly updated. This is used to decide to which best suitable
|
* replication server one wants to connect. Key: replication server id Value:
|
* replication server info for the matching replication server id
|
*/
|
private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos;
|
|
/**
|
* This integer defines when the best replication server checking algorithm
|
* should be engaged.
|
* Every time a monitoring message (each monitoring publisher period) is
|
* received, it is incremented. When it reaches 2, we run the checking
|
* algorithm to see if we must reconnect to another best replication server.
|
* Then we reset the value to 0. But when a topology message is received, the
|
* integer is reset to 0. This ensures that we wait at least one monitoring
|
* publisher period before running the algorithm, but also that we wait at
|
* least for a monitoring period after the last received topology message
|
* (topology stabilization).
|
*/
|
private int mustRunBestServerCheckingAlgorithm = 0;
|
|
/**
|
* The monitor provider for this replication domain. The name of the monitor
|
* includes the local address and must therefore be re-registered every time
|
* the session is re-established or destroyed. The monitor provider can only
|
* be created (i.e. non-null) if there is a replication domain, which is not
|
* the case in unit tests.
|
*/
|
private final ReplicationMonitor monitor;
|
|
/**
|
* Creates a new ReplicationServer Broker for a particular ReplicationDomain.
|
*
|
* @param replicationDomain The replication domain that is creating us.
|
* @param state The ServerState that should be used by this broker
|
* when negotiating the session with the replicationServer.
|
* @param config The configuration to use.
|
* @param generationId The generationId for the server associated to the
|
* provided serverId and for the domain associated to the provided baseDN.
|
* @param replSessionSecurity The session security configuration.
|
*/
|
public ReplicationBroker(ReplicationDomain replicationDomain,
|
ServerState state, ReplicationDomainCfg config, long generationId,
|
ReplSessionSecurity replSessionSecurity)
|
{
|
this.domain = replicationDomain;
|
this.state = state;
|
this.config = config;
|
this.protocolVersion = ProtocolVersion.getCurrentVersion();
|
this.replSessionSecurity = replSessionSecurity;
|
this.generationID = generationId;
|
this.rcvWindow = getMaxRcvWindow();
|
this.halfRcvWindow = rcvWindow / 2;
|
|
/*
|
* Only create a monitor if there is a replication domain (this is not the
|
* case in some unit tests).
|
*/
|
this.monitor = replicationDomain != null ? new ReplicationMonitor(
|
replicationDomain) : null;
|
registerReplicationMonitor();
|
}
|
|
/**
|
* Start the ReplicationBroker.
|
*/
|
public void start()
|
{
|
synchronized (startStopLock)
|
{
|
shutdown = false;
|
this.rcvWindow = getMaxRcvWindow();
|
connect();
|
}
|
}
|
|
/**
|
* Gets the group id of the RS we are connected to.
|
* @return The group id of the RS we are connected to
|
*/
|
public byte getRsGroupId()
|
{
|
return rsGroupId;
|
}
|
|
/**
|
* Gets the server id of the RS we are connected to.
|
* @return The server id of the RS we are connected to
|
*/
|
public int getRsServerId()
|
{
|
return rsServerId;
|
}
|
|
/**
|
* Gets the server id.
|
* @return The server id
|
*/
|
public int getServerId()
|
{
|
return config.getServerId();
|
}
|
|
private DN getBaseDN()
|
{
|
return config.getBaseDN();
|
}
|
|
private Set<String> getReplicationServerUrls()
|
{
|
return config.getReplicationServer();
|
}
|
|
private byte getGroupId()
|
{
|
return (byte) config.getGroupId();
|
}
|
|
/**
|
* Gets the server id.
|
* @return The server id
|
*/
|
private long getGenerationID()
|
{
|
if (domain != null)
|
{
|
// Update the generation id
|
generationID = domain.getGenerationID();
|
}
|
return generationID;
|
}
|
|
/**
|
* Set the generation id - for test purpose.
|
* @param generationID The generation id
|
*/
|
public void setGenerationID(long generationID)
|
{
|
this.generationID = generationID;
|
}
|
|
/**
|
* Gets the server url of the RS we are connected to.
|
* @return The server url of the RS we are connected to
|
*/
|
public String getRsServerUrl()
|
{
|
return rsServerUrl;
|
}
|
|
/**
|
* Sets the locally configured flag for the passed ReplicationServerInfo
|
* object, analyzing the local configuration.
|
* @param rsInfo the Replication server to check and update
|
*/
|
private void updateRSInfoLocallyConfiguredStatus(ReplicationServerInfo rsInfo)
|
{
|
// Determine if the passed ReplicationServerInfo has a URL that is present
|
// in the locally configured replication servers
|
String rsUrl = rsInfo.getServerURL();
|
if (rsUrl == null)
|
{
|
// The ReplicationServerInfo has been generated from a server with
|
// no URL in TopologyMsg (i.e: with replication protocol version < 4):
|
// ignore this server as we do not know how to connect to it
|
rsInfo.setLocallyConfigured(false);
|
return;
|
}
|
for (String serverUrl : getReplicationServerUrls())
|
{
|
if (isSameReplicationServerUrl(serverUrl, rsUrl))
|
{
|
// This RS is locally configured, mark this
|
rsInfo.setLocallyConfigured(true);
|
rsInfo.serverURL = serverUrl;
|
return;
|
}
|
}
|
rsInfo.setLocallyConfigured(false);
|
}
|
|
/**
|
* Compares 2 replication servers addresses and returns true if they both
|
* represent the same replication server instance.
|
* @param rs1Url Replication server 1 address
|
* @param rs2Url Replication server 2 address
|
* @return True if both replication server addresses represent the same
|
* replication server instance, false otherwise.
|
*/
|
private static boolean isSameReplicationServerUrl(String rs1Url,
|
String rs2Url)
|
{
|
try
|
{
|
final HostPort hp1 = HostPort.valueOf(rs1Url);
|
final HostPort hp2 = HostPort.valueOf(rs2Url);
|
return hp1.isEquivalentTo(hp2);
|
}
|
catch (RuntimeException ex)
|
{
|
// Not a RS url or not a valid port number: should not happen
|
return false;
|
}
|
}
|
|
/**
|
* Bag class for keeping info we get from a replication server in order to
|
* compute the best one to connect to. This is in fact a wrapper to a
|
* ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be
|
* updated with a info coming from received topology messages or monitoring
|
* messages.
|
*/
|
public static class ReplicationServerInfo
|
{
|
private short protocolVersion;
|
private long generationId;
|
private byte groupId = -1;
|
private int serverId;
|
/** Received server URL. */
|
private String serverURL;
|
private DN baseDN;
|
private int windowSize;
|
private ServerState serverState;
|
private boolean sslEncryption;
|
private int degradedStatusThreshold = -1;
|
/** Keeps the 1 value if created with a ReplServerStartMsg. */
|
private int weight = 1;
|
/** Keeps the 0 value if created with a ReplServerStartMsg. */
|
private int connectedDSNumber = 0;
|
private List<Integer> connectedDSs;
|
/**
|
* Is this RS locally configured? (the RS is recognized as a usable server).
|
*/
|
private boolean locallyConfigured = true;
|
|
/**
|
* Create a new instance of ReplicationServerInfo wrapping the passed
|
* message.
|
* @param msg Message to wrap.
|
* @param newServerURL Override serverURL.
|
* @return The new instance wrapping the passed message.
|
* @throws IllegalArgumentException If the passed message has an unexpected
|
* type.
|
*/
|
public static ReplicationServerInfo newInstance(
|
ReplicationMsg msg, String newServerURL) throws IllegalArgumentException
|
{
|
ReplicationServerInfo rsInfo = newInstance(msg);
|
rsInfo.serverURL = newServerURL;
|
return rsInfo;
|
}
|
|
/**
|
* Create a new instance of ReplicationServerInfo wrapping the passed
|
* message.
|
* @param msg Message to wrap.
|
* @return The new instance wrapping the passed message.
|
* @throws IllegalArgumentException If the passed message has an unexpected
|
* type.
|
*/
|
public static ReplicationServerInfo newInstance(
|
ReplicationMsg msg) throws IllegalArgumentException
|
{
|
if (msg instanceof ReplServerStartMsg)
|
{
|
// This is a ReplServerStartMsg (RS uses protocol V3 or under)
|
ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg;
|
return new ReplicationServerInfo(replServerStartMsg);
|
} else if (msg instanceof ReplServerStartDSMsg)
|
{
|
// This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
|
ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg;
|
return new ReplicationServerInfo(replServerStartDSMsg);
|
}
|
|
// Unsupported message type: should not happen
|
throw new IllegalArgumentException("Unexpected PDU type: " +
|
msg.getClass().getName() + " :\n" + msg);
|
}
|
|
/**
|
* Constructs a ReplicationServerInfo object wrapping a
|
* {@link ReplServerStartMsg}.
|
*
|
* @param replServerStartMsg
|
* The {@link ReplServerStartMsg} this object will wrap.
|
*/
|
private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
|
{
|
this.protocolVersion = replServerStartMsg.getVersion();
|
this.generationId = replServerStartMsg.getGenerationId();
|
this.groupId = replServerStartMsg.getGroupId();
|
this.serverId = replServerStartMsg.getServerId();
|
this.serverURL = replServerStartMsg.getServerURL();
|
this.baseDN = replServerStartMsg.getBaseDN();
|
this.windowSize = replServerStartMsg.getWindowSize();
|
this.serverState = replServerStartMsg.getServerState();
|
this.sslEncryption = replServerStartMsg.getSSLEncryption();
|
this.degradedStatusThreshold =
|
replServerStartMsg.getDegradedStatusThreshold();
|
}
|
|
/**
|
* Constructs a ReplicationServerInfo object wrapping a
|
* {@link ReplServerStartDSMsg}.
|
*
|
* @param replServerStartDSMsg
|
* The {@link ReplServerStartDSMsg} this object will wrap.
|
*/
|
private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
|
{
|
this.protocolVersion = replServerStartDSMsg.getVersion();
|
this.generationId = replServerStartDSMsg.getGenerationId();
|
this.groupId = replServerStartDSMsg.getGroupId();
|
this.serverId = replServerStartDSMsg.getServerId();
|
this.serverURL = replServerStartDSMsg.getServerURL();
|
this.baseDN = replServerStartDSMsg.getBaseDN();
|
this.windowSize = replServerStartDSMsg.getWindowSize();
|
this.serverState = replServerStartDSMsg.getServerState();
|
this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
|
this.degradedStatusThreshold =
|
replServerStartDSMsg.getDegradedStatusThreshold();
|
this.weight = replServerStartDSMsg.getWeight();
|
this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
|
}
|
|
/**
|
* Get the server state.
|
* @return The server state
|
*/
|
public ServerState getServerState()
|
{
|
return serverState;
|
}
|
|
/**
|
* get the group id.
|
* @return The group id
|
*/
|
public byte getGroupId()
|
{
|
return groupId;
|
}
|
|
/**
|
* Get the server protocol version.
|
* @return the protocolVersion
|
*/
|
public short getProtocolVersion()
|
{
|
return protocolVersion;
|
}
|
|
/**
|
* Get the generation id.
|
* @return the generationId
|
*/
|
public long getGenerationId()
|
{
|
return generationId;
|
}
|
|
/**
|
* Get the server id.
|
* @return the serverId
|
*/
|
public int getServerId()
|
{
|
return serverId;
|
}
|
|
/**
|
* Get the server URL.
|
* @return the serverURL
|
*/
|
public String getServerURL()
|
{
|
return serverURL;
|
}
|
|
/**
|
* Get the base DN.
|
*
|
* @return the base DN
|
*/
|
public DN getBaseDN()
|
{
|
return baseDN;
|
}
|
|
/**
|
* Get the window size.
|
* @return the windowSize
|
*/
|
public int getWindowSize()
|
{
|
return windowSize;
|
}
|
|
/**
|
* Get the ssl encryption.
|
* @return the sslEncryption
|
*/
|
public boolean isSslEncryption()
|
{
|
return sslEncryption;
|
}
|
|
/**
|
* Get the degraded status threshold.
|
* @return the degradedStatusThreshold
|
*/
|
public int getDegradedStatusThreshold()
|
{
|
return degradedStatusThreshold;
|
}
|
|
/**
|
* Get the weight.
|
* @return the weight. Null if this object is a wrapper for
|
* a ReplServerStartMsg.
|
*/
|
public int getWeight()
|
{
|
return weight;
|
}
|
|
/**
|
* Get the connected DS number.
|
* @return the connectedDSNumber. Null if this object is a wrapper for
|
* a ReplServerStartMsg.
|
*/
|
public int getConnectedDSNumber()
|
{
|
return connectedDSNumber;
|
}
|
|
/**
|
* Constructs a new replication server info with the passed RSInfo
|
* internal values and the passed connected DSs.
|
* @param rsInfo The RSinfo to use for the update
|
* @param connectedDSs The new connected DSs
|
*/
|
public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs)
|
{
|
this.serverId = rsInfo.getId();
|
this.serverURL = rsInfo.getServerUrl();
|
this.generationId = rsInfo.getGenerationId();
|
this.groupId = rsInfo.getGroupId();
|
this.weight = rsInfo.getWeight();
|
this.connectedDSs = connectedDSs;
|
this.connectedDSNumber = connectedDSs.size();
|
this.serverState = new ServerState();
|
}
|
|
/**
|
* Converts the object to a RSInfo object.
|
* @return The RSInfo object matching this object.
|
*/
|
public RSInfo toRSInfo()
|
{
|
return new RSInfo(serverId, serverURL, generationId, groupId, weight);
|
}
|
|
/**
|
* Updates replication server info with the passed RSInfo internal values
|
* and the passed connected DSs.
|
* @param rsInfo The RSinfo to use for the update
|
* @param connectedDSs The new connected DSs
|
*/
|
public void update(RSInfo rsInfo, List<Integer> connectedDSs)
|
{
|
this.generationId = rsInfo.getGenerationId();
|
this.groupId = rsInfo.getGroupId();
|
this.weight = rsInfo.getWeight();
|
this.connectedDSs = connectedDSs;
|
this.connectedDSNumber = connectedDSs.size();
|
}
|
|
/**
|
* Updates replication server info with the passed server state.
|
* @param serverState The ServerState to use for the update
|
*/
|
public void update(ServerState serverState)
|
{
|
if (this.serverState != null)
|
{
|
this.serverState.update(serverState);
|
} else
|
{
|
this.serverState = serverState;
|
}
|
}
|
|
/**
|
* Get the getConnectedDSs.
|
* @return the getConnectedDSs
|
*/
|
public List<Integer> getConnectedDSs()
|
{
|
return connectedDSs;
|
}
|
|
/**
|
* Gets the locally configured status for this RS.
|
* @return the locallyConfigured
|
*/
|
public boolean isLocallyConfigured()
|
{
|
return locallyConfigured;
|
}
|
|
/**
|
* Sets the locally configured status for this RS.
|
* @param locallyConfigured the locallyConfigured to set
|
*/
|
public void setLocallyConfigured(boolean locallyConfigured)
|
{
|
this.locallyConfigured = locallyConfigured;
|
}
|
|
/**
|
* Returns a string representation of this object.
|
* @return A string representation of this object.
|
*/
|
@Override
|
public String toString()
|
{
|
return "Url:" + this.serverURL + " ServerId:" + this.serverId
|
+ " GroupId:" + this.groupId;
|
}
|
}
|
|
private void connect()
|
{
|
if (getBaseDN().toNormalizedString().equalsIgnoreCase(
|
ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
|
{
|
connectAsECL();
|
} else
|
{
|
connectAsDataServer();
|
}
|
}
|
|
/**
|
* Contacts all replication servers to get information from them and being
|
* able to choose the more suitable.
|
* @return the collected information.
|
*/
|
private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
|
{
|
Map<Integer, ReplicationServerInfo> rsInfos =
|
new ConcurrentHashMap<Integer, ReplicationServerInfo>();
|
|
for (String serverUrl : getReplicationServerUrls())
|
{
|
// Connect to server + get and store info about it
|
ReplicationServerInfo rsInfo =
|
performPhaseOneHandshake(serverUrl, false, false);
|
if (rsInfo != null)
|
{
|
rsInfos.put(rsInfo.getServerId(), rsInfo);
|
}
|
}
|
|
return rsInfos;
|
}
|
|
/**
|
* Special aspects of connecting as ECL (External Change Log) compared to
|
* connecting as data server are :
|
* <ul>
|
* <li>1 single RS configured</li>
|
* <li>so no choice of the preferred RS</li>
|
* <li>?? Heartbeat</li>
|
* <li>Start handshake is :
|
*
|
* <pre>
|
* Broker ---> StartECLMsg ---> RS
|
* <---- ReplServerStartMsg ---
|
* ---> StartSessionECLMsg --> RS
|
* </pre>
|
*
|
* </li>
|
* </ul>
|
*/
|
private void connectAsECL()
|
{
|
// FIXME:ECL List of RS to connect is for now limited to one RS only
|
String bestServer = getReplicationServerUrls().iterator().next();
|
|
if (performPhaseOneHandshake(bestServer, true, true) != null)
|
{
|
performECLPhaseTwoHandshake(bestServer);
|
}
|
}
|
|
/**
|
* Connect to a ReplicationServer.
|
*
|
* Handshake sequences between a DS and a RS is divided into 2 logical
|
* consecutive phases (phase 1 and phase 2). DS always initiates connection
|
* and always sends first message:
|
*
|
* DS<->RS:
|
* -------
|
*
|
* phase 1:
|
* DS --- ServerStartMsg ---> RS
|
* DS <--- ReplServerStartDSMsg --- RS
|
* phase 2:
|
* DS --- StartSessionMsg ---> RS
|
* DS <--- TopologyMsg --- RS
|
*
|
* Before performing a full handshake sequence, DS searches for best suitable
|
* RS by making only phase 1 handshake to every RS he knows then closing
|
* connection. This allows to gather information on available RSs and then
|
* decide with which RS the full handshake (phase 1 then phase 2) will be
|
* finally performed.
|
*
|
* @throws NumberFormatException address was invalid
|
*/
|
private void connectAsDataServer()
|
{
|
/*
|
May have created a broker with null replication domain for
|
unit test purpose.
|
*/
|
if (domain != null)
|
{
|
/*
|
If a first connect or a connection failure occur, we go through here.
|
force status machine to NOT_CONNECTED_STATUS so that monitoring can
|
see that we are not connected.
|
*/
|
domain.toNotConnectedStatus();
|
}
|
|
/*
|
Stop any existing heartbeat monitor and changeTime publisher
|
from a previous session.
|
*/
|
stopRSHeartBeatMonitoring();
|
stopChangeTimeHeartBeatPublishing();
|
mustRunBestServerCheckingAlgorithm = 0;
|
|
synchronized (connectPhaseLock)
|
{
|
final int serverId = getServerId();
|
final DN baseDN = getBaseDN();
|
|
/*
|
* Connect to each replication server and get their ServerState then find
|
* out which one is the best to connect to.
|
*/
|
if (debugEnabled())
|
debugInfo("phase 1 : will perform PhaseOneH with each RS in order to "
|
+ "elect the preferred one");
|
|
// Get info from every available replication servers
|
replicationServerInfos = collectReplicationServersInfo();
|
|
ReplicationServerInfo electedRsInfo = null;
|
|
if (replicationServerInfos.size() > 0)
|
{
|
// At least one server answered, find the best one.
|
RSEvaluations evals = computeBestReplicationServer(true, -1, state,
|
replicationServerInfos, serverId, getGroupId(), getGenerationID());
|
electedRsInfo = evals.getBestRS();
|
|
// Best found, now initialize connection to this one (handshake phase 1)
|
if (debugEnabled())
|
debugInfo("phase 2 : will perform PhaseOneH with the preferred RS="
|
+ electedRsInfo);
|
electedRsInfo = performPhaseOneHandshake(
|
electedRsInfo.getServerURL(), true, false);
|
|
if (electedRsInfo != null)
|
{
|
/*
|
Update replication server info with potentially more up to date
|
data (server state for instance may have changed)
|
*/
|
replicationServerInfos
|
.put(electedRsInfo.getServerId(), electedRsInfo);
|
|
// Handshake phase 1 exchange went well
|
|
// Compute in which status we are starting the session to tell the RS
|
ServerStatus initStatus =
|
computeInitialServerStatus(electedRsInfo.getGenerationId(),
|
electedRsInfo.getServerState(),
|
electedRsInfo.getDegradedStatusThreshold(),
|
getGenerationID());
|
|
// Perform session start (handshake phase 2)
|
TopologyMsg topologyMsg = performPhaseTwoHandshake(
|
electedRsInfo.getServerURL(), initStatus);
|
|
if (topologyMsg != null) // Handshake phase 2 exchange went well
|
{
|
connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
|
} // Could perform handshake phase 2 with best
|
|
} // Could perform handshake phase 1 with best
|
|
} // Reached some servers
|
|
// connected is set by connectToReplicationServer()
|
// and electedRsInfo isn't null then. Check anyway
|
if (electedRsInfo != null && connected)
|
{
|
connectPhaseLock.notify();
|
|
if ((electedRsInfo.getGenerationId() == getGenerationID())
|
|| (electedRsInfo.getGenerationId() == -1))
|
{
|
Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
|
.get(serverId, rsServerId, baseDN.toNormalizedString(),
|
session.getReadableRemoteAddress(),
|
getGenerationID());
|
logError(message);
|
} else
|
{
|
Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
|
.get(serverId, rsServerId, baseDN.toNormalizedString(),
|
session.getReadableRemoteAddress(),
|
getGenerationID(),
|
electedRsInfo.getGenerationId());
|
logError(message);
|
}
|
} else
|
{
|
/*
|
* This server could not find any replicationServer. It's going to start
|
* in degraded mode. Log a message.
|
*/
|
connected = false;
|
replicationServer = NO_CONNECTED_SERVER;
|
|
if (!connectionError)
|
{
|
connectionError = true;
|
connectPhaseLock.notify();
|
|
if (replicationServerInfos.size() > 0)
|
{
|
Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
|
serverId, baseDN.toNormalizedString(),
|
collectionToString(replicationServerInfos.keySet(), ", "));
|
logError(message);
|
}
|
else
|
{
|
Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
|
serverId, baseDN.toNormalizedString());
|
logError(message);
|
}
|
}
|
}
|
}
|
}
|
|
/**
|
* Connects to a replication server.
|
*
|
* @param rsInfo
|
* the Replication Server to connect to
|
* @param initStatus
|
* The status to enter the state machine with
|
* @param topologyMsg
|
* the message containing the topology information
|
*/
|
private void connectToReplicationServer(ReplicationServerInfo rsInfo,
|
ServerStatus initStatus, TopologyMsg topologyMsg)
|
{
|
final int serverId = getServerId();
|
final DN baseDN = getBaseDN();
|
try
|
{
|
replicationServer = session.getReadableRemoteAddress();
|
maxSendWindow = rsInfo.getWindowSize();
|
rsGroupId = rsInfo.getGroupId();
|
rsServerId = rsInfo.getServerId();
|
rsServerUrl = rsInfo.getServerURL();
|
|
receiveTopo(topologyMsg);
|
|
/*
|
Log a message to let the administrator know that the failure
|
was resolved.
|
Wake up all the thread that were waiting on the window
|
on the previous connection.
|
*/
|
connectionError = false;
|
if (sendWindow != null)
|
{
|
/*
|
* Fix (hack) for OPENDJ-401: we want to ensure that no threads holding
|
* this semaphore will get blocked when they acquire it. However, we
|
* also need to make sure that we don't overflow the semaphore by
|
* releasing too many permits.
|
*/
|
final int MAX_PERMITS = (Integer.MAX_VALUE >>> 2);
|
if (sendWindow.availablePermits() < MAX_PERMITS)
|
{
|
/*
|
* At least 2^29 acquisitions would need to occur for this to be
|
* insufficient. In addition, at least 2^30 releases would need to
|
* occur for this to potentially overflow. Hopefully this is unlikely
|
* to happen.
|
*/
|
sendWindow.release(MAX_PERMITS);
|
}
|
}
|
sendWindow = new Semaphore(maxSendWindow);
|
rcvWindow = getMaxRcvWindow();
|
connected = true;
|
|
/*
|
May have created a broker with null replication domain for
|
unit test purpose.
|
*/
|
if (domain != null)
|
{
|
domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo
|
.getGenerationId(), session);
|
}
|
|
final byte groupId = getGroupId();
|
if (getRsGroupId() != groupId)
|
{
|
/*
|
Connected to replication server with wrong group id:
|
warn user and start heartbeat monitor to recover when a server
|
with the right group id shows up.
|
*/
|
Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
|
Byte.toString(groupId), Integer.toString(rsServerId),
|
rsInfo.getServerURL(), Byte.toString(getRsGroupId()),
|
baseDN.toNormalizedString(), Integer.toString(serverId));
|
logError(message);
|
}
|
startRSHeartBeatMonitoring();
|
if (rsInfo.getProtocolVersion() >=
|
ProtocolVersion.REPLICATION_PROTOCOL_V3)
|
{
|
startChangeTimeHeartBeatPublishing();
|
}
|
}
|
catch (Exception e)
|
{
|
Message message = ERR_COMPUTING_FAKE_OPS.get(
|
baseDN.toNormalizedString(), rsInfo.getServerURL(),
|
e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
|
logError(message);
|
}
|
finally
|
{
|
if (!connected)
|
{
|
setSession(null);
|
}
|
}
|
}
|
|
/**
|
* Determines the status we are starting with according to our state and the
|
* RS state.
|
*
|
* @param rsGenId The generation id of the RS
|
* @param rsState The server state of the RS
|
* @param degradedStatusThreshold The degraded status threshold of the RS
|
* @param dsGenId The local generation id
|
* @return The initial status
|
*/
|
public ServerStatus computeInitialServerStatus(long rsGenId,
|
ServerState rsState, int degradedStatusThreshold, long dsGenId)
|
{
|
if (rsGenId == -1)
|
{
|
// RS has no generation id
|
return ServerStatus.NORMAL_STATUS;
|
}
|
else if (rsGenId != dsGenId)
|
{
|
// DS and RS do not have same generation id
|
return ServerStatus.BAD_GEN_ID_STATUS;
|
}
|
else
|
{
|
/*
|
DS and RS have same generation id
|
|
Determine if we are late or not to replay changes. RS uses a
|
threshold value for pending changes to be replayed by a DS to
|
determine if the DS is in normal status or in degraded status.
|
Let's compare the local and remote server state using this threshold
|
value to determine if we are late or not
|
*/
|
|
int nChanges = ServerState.diffChanges(rsState, state);
|
if (debugEnabled())
|
{
|
debugInfo("computed " + nChanges + " changes late.");
|
}
|
|
/*
|
Check status to know if it is relevant to change the status. Do not
|
take RSD lock to test. If we attempt to change the status whereas
|
we are in a status that do not allows that, this will be noticed by
|
the changeStatusFromStatusAnalyzer method. This allows to take the
|
lock roughly only when needed versus every sleep time timeout.
|
*/
|
if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold)
|
{
|
return ServerStatus.DEGRADED_STATUS;
|
}
|
// degradedStatusThreshold value of '0' means no degrading system used
|
// (no threshold): force normal status
|
return ServerStatus.NORMAL_STATUS;
|
}
|
}
|
|
|
|
/**
|
* Connect to the provided server performing the first phase handshake (start
|
* messages exchange) and return the reply message from the replication
|
* server, wrapped in a ReplicationServerInfo object.
|
*
|
* @param server
|
* Server to connect to.
|
* @param keepConnection
|
* Do we keep session opened or not after handshake. Use true if want
|
* to perform handshake phase 2 with the same session and keep the
|
* session to create as the current one.
|
* @param isECL
|
* Indicates whether or not the an ECL handshake is to be performed.
|
* @return The answer from the server . Null if could not get an answer.
|
*/
|
private ReplicationServerInfo performPhaseOneHandshake(
|
String server, boolean keepConnection, boolean isECL)
|
{
|
Session localSession = null;
|
Socket socket = null;
|
boolean hasConnected = false;
|
Message errorMessage = null;
|
|
try
|
{
|
// Open a socket connection to the next candidate.
|
socket = new Socket();
|
socket.setReceiveBufferSize(1000000);
|
socket.setTcpNoDelay(true);
|
int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
|
socket.connect(HostPort.valueOf(server).toInetSocketAddress(), timeoutMS);
|
localSession = replSessionSecurity.createClientSession(socket, timeoutMS);
|
boolean isSslEncryption = replSessionSecurity.isSslEncryption();
|
|
// Send our ServerStartMsg.
|
String url = socket.getLocalAddress().getHostName() + ":"
|
+ socket.getLocalPort();
|
StartMsg serverStartMsg;
|
if (!isECL)
|
{
|
serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
|
getMaxRcvWindow(), config.getHeartbeatInterval(), state,
|
getGenerationID(), isSslEncryption, getGroupId());
|
}
|
else
|
{
|
serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
|
getMaxRcvWindow(), config.getHeartbeatInterval(), state,
|
getGenerationID(), isSslEncryption, getGroupId());
|
}
|
localSession.publish(serverStartMsg);
|
|
// Read the ReplServerStartMsg or ReplServerStartDSMsg that should
|
// come back.
|
ReplicationMsg msg = localSession.receive();
|
if (debugEnabled())
|
{
|
debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
|
+ msg);
|
}
|
|
// Wrap received message in a server info object
|
ReplicationServerInfo replServerInfo = ReplicationServerInfo
|
.newInstance(msg, server);
|
|
// Sanity check
|
DN repDN = replServerInfo.getBaseDN();
|
if (!getBaseDN().equals(repDN))
|
{
|
errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(
|
repDN.toNormalizedString(), getBaseDN().toNormalizedString());
|
return null;
|
}
|
|
/*
|
* We have sent our own protocol version to the replication server. The
|
* replication server will use the same one (or an older one if it is an
|
* old replication server).
|
*/
|
final short localProtocolVersion = getCompatibleVersion(replServerInfo
|
.getProtocolVersion());
|
if (keepConnection)
|
{
|
protocolVersion = localProtocolVersion;
|
}
|
localSession.setProtocolVersion(localProtocolVersion);
|
|
if (!isSslEncryption)
|
{
|
localSession.stopEncryption();
|
}
|
|
hasConnected = true;
|
|
// If this connection as the one to use for sending and receiving
|
// updates, store it.
|
if (keepConnection)
|
{
|
setSession(localSession);
|
}
|
|
return replServerInfo;
|
}
|
catch (ConnectException e)
|
{
|
errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(),
|
server, getBaseDN().toNormalizedString());
|
return null;
|
}
|
catch (SocketTimeoutException e)
|
{
|
errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(),
|
server, getBaseDN().toNormalizedString());
|
return null;
|
}
|
catch (Exception e)
|
{
|
errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(),
|
server, getBaseDN().toNormalizedString(),
|
stackTraceToSingleLineString(e));
|
return null;
|
}
|
finally
|
{
|
if (!hasConnected || !keepConnection)
|
{
|
close(localSession);
|
close(socket);
|
}
|
|
if (!hasConnected && errorMessage != null)
|
{
|
// There was no server waiting on this host:port Log a notice and try
|
// the next replicationServer in the list
|
if (!connectionError)
|
{
|
if (keepConnection) // Log error message only for final connection
|
{
|
// the error message is only logged once to avoid overflowing
|
// the error log
|
logError(errorMessage);
|
}
|
|
if (debugEnabled())
|
{
|
TRACER.debugInfo(errorMessage.toString());
|
}
|
}
|
}
|
}
|
}
|
|
|
|
/**
|
* Performs the second phase handshake for External Change Log (send
|
* StartSessionMsg and receive TopologyMsg messages exchange) and return the
|
* reply message from the replication server.
|
*
|
* @param server Server we are connecting with.
|
*/
|
private void performECLPhaseTwoHandshake(String server)
|
{
|
try
|
{
|
// Send our Start Session
|
StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
|
startECLSessionMsg.setOperationId("-1");
|
final Session localSession = session;
|
localSession.publish(startECLSessionMsg);
|
|
// FIXME ECL In the handshake phase two, should RS send back a topo msg ?
|
if (debugEnabled())
|
{
|
TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
|
}
|
|
// Alright set the timeout to the desired value
|
localSession.setSoTimeout(timeout);
|
connected = true;
|
} catch (Exception e)
|
{
|
Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
|
getServerId(), server, getBaseDN().toNormalizedString(),
|
stackTraceToSingleLineString(e));
|
logError(message);
|
|
setSession(null);
|
}
|
}
|
|
/**
|
* Performs the second phase handshake (send StartSessionMsg and receive
|
* TopologyMsg messages exchange) and return the reply message from the
|
* replication server.
|
*
|
* @param server Server we are connecting with.
|
* @param initStatus The status we are starting with
|
* @return The ReplServerStartMsg the server replied. Null if could not
|
* get an answer.
|
*/
|
private TopologyMsg performPhaseTwoHandshake(String server,
|
ServerStatus initStatus)
|
{
|
try
|
{
|
/*
|
* Send our StartSessionMsg.
|
*/
|
StartSessionMsg startSessionMsg;
|
// May have created a broker with null replication domain for
|
// unit test purpose.
|
if (domain != null)
|
{
|
startSessionMsg =
|
new StartSessionMsg(
|
initStatus,
|
domain.getRefUrls(),
|
domain.isAssured(),
|
domain.getAssuredMode(),
|
domain.getAssuredSdLevel());
|
startSessionMsg.setEclIncludes(
|
domain.getEclIncludes(domain.getServerId()),
|
domain.getEclIncludesForDeletes(domain.getServerId()));
|
}
|
else
|
{
|
startSessionMsg =
|
new StartSessionMsg(initStatus, new ArrayList<String>());
|
}
|
final Session localSession = session;
|
localSession.publish(startSessionMsg);
|
|
/*
|
* Read the TopologyMsg that should come back.
|
*/
|
TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
|
|
if (debugEnabled())
|
{
|
TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg
|
+ "\nAND RECEIVED:\n" + topologyMsg);
|
}
|
|
// Alright set the timeout to the desired value
|
localSession.setSoTimeout(timeout);
|
return topologyMsg;
|
} catch (Exception e)
|
{
|
Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
|
getServerId(), server, getBaseDN().toNormalizedString(),
|
stackTraceToSingleLineString(e));
|
logError(message);
|
|
setSession(null);
|
|
// Be sure to return null.
|
return null;
|
}
|
}
|
|
/**
|
* Class holding evaluation results for electing the best replication server
|
* for the local directory server.
|
*/
|
static class RSEvaluations
|
{
|
|
private final int localServerId;
|
private Map<Integer, ReplicationServerInfo> bestRSs;
|
private final Map<Integer, Message> rsEvals =
|
new HashMap<Integer, Message>();
|
|
/**
|
* Ctor.
|
*
|
* @param localServerId
|
* the serverId for the local directory server
|
* @param rsInfos
|
* a Map of serverId => {@link ReplicationServerInfo} with all the
|
* candidate replication servers
|
*/
|
RSEvaluations(int localServerId,
|
Map<Integer, ReplicationServerInfo> rsInfos)
|
{
|
this.localServerId = localServerId;
|
this.bestRSs = rsInfos;
|
}
|
|
private boolean keepBest(LocalEvaluation eval)
|
{
|
if (eval.hasAcceptedAny())
|
{
|
bestRSs = eval.getAccepted();
|
rsEvals.putAll(eval.getRejected());
|
return true;
|
}
|
return false;
|
}
|
|
/**
|
* Sets the elected best replication server, rejecting all the other
|
* replication servers with the supplied evaluation.
|
*
|
* @param bestRsId
|
* the serverId of the elected replication server
|
* @param rejectedRSsEval
|
* the evaluation for all the rejected replication servers
|
*/
|
private void setBestRS(int bestRsId, Message rejectedRSsEval)
|
{
|
for (Iterator<Entry<Integer, ReplicationServerInfo>> it =
|
this.bestRSs.entrySet().iterator(); it.hasNext();)
|
{
|
final Entry<Integer, ReplicationServerInfo> entry = it.next();
|
final Integer rsId = entry.getKey();
|
final ReplicationServerInfo rsInfo = entry.getValue();
|
if (rsInfo.getServerId() != bestRsId)
|
{
|
it.remove();
|
}
|
rsEvals.put(rsId, rejectedRSsEval);
|
}
|
}
|
|
private void discardAll(Message eval)
|
{
|
for (Integer rsId : bestRSs.keySet())
|
{
|
rsEvals.put(rsId, eval);
|
}
|
}
|
|
private boolean foundBestRS()
|
{
|
return bestRSs.size() == 1;
|
}
|
|
/**
|
* Returns the {@link ReplicationServerInfo} for the best replication
|
* server.
|
*
|
* @return the {@link ReplicationServerInfo} for the best replication server
|
*/
|
ReplicationServerInfo getBestRS()
|
{
|
if (foundBestRS())
|
{
|
return bestRSs.values().iterator().next();
|
}
|
return null;
|
}
|
|
/**
|
* Returns the evaluations for all the candidate replication servers.
|
*
|
* @return a Map of serverId => Message containing the evaluation for each
|
* candidate replication servers.
|
*/
|
Map<Integer, Message> getEvaluations()
|
{
|
if (foundBestRS())
|
{
|
final Integer bestRSServerId = getBestRS().getServerId();
|
if (rsEvals.get(bestRSServerId) == null)
|
{
|
final Message eval = NOTE_BEST_RS.get(bestRSServerId, localServerId);
|
rsEvals.put(bestRSServerId, eval);
|
}
|
}
|
return Collections.unmodifiableMap(rsEvals);
|
}
|
|
/**
|
* Returns the evaluation for the supplied replication server Id.
|
* <p>
|
* Note: "unknown RS" message is returned if the supplied replication server
|
* was not part of the candidate replication servers.
|
*
|
* @param rsServerId
|
* the supplied replication server Id
|
* @return the evaluation {@link Message} for the supplied replication
|
* server Id
|
*/
|
private Message getEvaluation(int rsServerId)
|
{
|
final Message evaluation = getEvaluations().get(rsServerId);
|
if (evaluation != null)
|
{
|
return evaluation;
|
}
|
return NOTE_UNKNOWN_RS.get(rsServerId, localServerId);
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public String toString()
|
{
|
return new StringBuilder()
|
.append("Current best replication server Ids: ").append(
|
this.bestRSs.keySet()).append(
|
", Evaluation of connected replication servers").append(
|
" (ServerId => Evaluation): ").append(this.rsEvals.keySet())
|
.append(", Any replication server not appearing here").append(
|
" could not be contacted.").toString();
|
}
|
}
|
|
/**
|
* Evaluation local to one filter.
|
*/
|
private static class LocalEvaluation
|
{
|
private final Map<Integer, ReplicationServerInfo> filteredRSs =
|
new HashMap<Integer, ReplicationServerInfo>();
|
private final Map<ReplicationServerInfo, Message> rsEvals =
|
new HashMap<ReplicationServerInfo, Message>();
|
|
private void accept(Integer rsId, ReplicationServerInfo rsInfo)
|
{
|
this.rsEvals.remove(rsInfo); // undo reject
|
this.filteredRSs.put(rsId, rsInfo);
|
}
|
|
private void reject(ReplicationServerInfo rsInfo, Message reason)
|
{
|
this.filteredRSs.remove(rsInfo.getServerId()); // undo accept
|
this.rsEvals.put(rsInfo, reason);
|
}
|
|
private Map<Integer, ReplicationServerInfo> getAccepted()
|
{
|
return filteredRSs;
|
}
|
|
public Map<Integer, Message> getRejected()
|
{
|
final Map<Integer, Message> result = new HashMap<Integer, Message>();
|
for (Entry<ReplicationServerInfo, Message> entry : rsEvals.entrySet())
|
{
|
result.put(entry.getKey().getServerId(), entry.getValue());
|
}
|
return result;
|
}
|
|
private boolean hasAcceptedAny()
|
{
|
return !filteredRSs.isEmpty();
|
}
|
|
}
|
|
/**
|
* Returns the replication server that best fits our need so that we can
|
* connect to it or determine if we must disconnect from current one to
|
* re-connect to best server.
|
* <p>
|
* Note: this method is static for test purpose (access from unit tests)
|
*
|
* @param firstConnection True if we run this method for the very first
|
* connection of the broker. False if we run this method to determine if the
|
* replication server we are currently connected to is still the best or not.
|
* @param rsServerId The id of the replication server we are currently
|
* connected to. Only used when firstConnection is false.
|
* @param myState The local server state.
|
* @param rsInfos The list of available replication servers and their
|
* associated information (choice will be made among them).
|
* @param localServerId The server id for the suffix we are working for.
|
* @param groupId The groupId we prefer being connected to if possible
|
* @param generationId The generation id we are using
|
* @return The computed best replication server. If the returned value is
|
* null, the best replication server is undetermined but the local server must
|
* disconnect (so the best replication server is another one than the current
|
* one). Null can only be returned when firstConnection is false.
|
*/
|
public static RSEvaluations computeBestReplicationServer(
|
boolean firstConnection, int rsServerId, ServerState myState,
|
Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
|
byte groupId, long generationId)
|
{
|
final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos);
|
// Shortcut, if only one server, this is the best
|
if (evals.foundBestRS())
|
{
|
return evals;
|
}
|
|
/**
|
* Apply some filtering criteria to determine the best servers list from
|
* the available ones. The ordered list of criteria is (from more important
|
* to less important):
|
* - replication server has the same group id as the local DS one
|
* - replication server has the same generation id as the local DS one
|
* - replication server is up to date regarding changes generated by the
|
* local DS
|
* - replication server in the same VM as local DS one
|
*/
|
/*
|
The list of best replication servers is filtered with each criteria. At
|
each criteria, the list is replaced with the filtered one if there
|
are some servers from the filtering, otherwise, the list is left as is
|
and the new filtering for the next criteria is applied and so on.
|
|
Use only servers locally configured: those are servers declared in
|
the local configuration. When the current method is called, for
|
sure, at least one server from the list is locally configured
|
*/
|
filterServersLocallyConfigured(evals, localServerId);
|
// Some servers with same group id ?
|
filterServersWithSameGroupId(evals, localServerId, groupId);
|
// Some servers with same generation id ?
|
final boolean rssWithSameGenerationIdExist =
|
filterServersWithSameGenerationId(evals, localServerId, generationId);
|
if (rssWithSameGenerationIdExist)
|
{
|
// If some servers with the right generation id this is useful to
|
// run the local DS change criteria
|
filterServersWithAllLocalDSChanges(evals, myState, localServerId);
|
}
|
// Some servers in the local VM or local host?
|
filterServersOnSameHost(evals, localServerId);
|
|
if (evals.foundBestRS())
|
{
|
return evals;
|
}
|
|
/**
|
* Now apply the choice based on the weight to the best servers list
|
*/
|
if (firstConnection)
|
{
|
// We are not connected to a server yet
|
computeBestServerForWeight(evals, -1, -1);
|
}
|
else
|
{
|
/*
|
* We are already connected to a RS: compute the best RS as far as the
|
* weights is concerned. If this is another one, some DS must disconnect.
|
*/
|
computeBestServerForWeight(evals, rsServerId, localServerId);
|
}
|
return evals;
|
}
|
|
/**
|
* Creates a new list that contains only replication servers that are locally
|
* configured.
|
* @param evals The evaluation object
|
*/
|
private static void filterServersLocallyConfigured(RSEvaluations evals,
|
int localServerId)
|
{
|
final LocalEvaluation eval = new LocalEvaluation();
|
for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
|
{
|
final Integer rsId = entry.getKey();
|
final ReplicationServerInfo rsInfo = entry.getValue();
|
if (rsInfo.isLocallyConfigured())
|
{
|
eval.accept(rsId, rsInfo);
|
}
|
else
|
{
|
eval.reject(rsInfo,
|
NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId));
|
}
|
}
|
evals.keepBest(eval);
|
}
|
|
/**
|
* Creates a new list that contains only replication servers that have the
|
* passed group id, from a passed replication server list.
|
* @param evals The evaluation object
|
* @param groupId The group id that must match
|
*/
|
private static void filterServersWithSameGroupId(RSEvaluations evals,
|
int localServerId, byte groupId)
|
{
|
final LocalEvaluation eval = new LocalEvaluation();
|
for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
|
{
|
final Integer rsId = entry.getKey();
|
final ReplicationServerInfo rsInfo = entry.getValue();
|
if (rsInfo.getGroupId() == groupId)
|
{
|
eval.accept(rsId, rsInfo);
|
}
|
else
|
{
|
eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get(
|
rsId, rsInfo.getGroupId(), localServerId, groupId));
|
}
|
}
|
evals.keepBest(eval);
|
}
|
|
/**
|
* Creates a new list that contains only replication servers that have the
|
* provided generation id, from a provided replication server list.
|
* When the selected replication servers have no change (empty serverState)
|
* then the 'empty'(generationId==-1) replication servers are also included
|
* in the result list.
|
*
|
* @param evals The evaluation object
|
* @param generationId The generation id that must match
|
* @return whether some replication server passed the filter
|
*/
|
private static boolean filterServersWithSameGenerationId(
|
RSEvaluations evals, long localServerId, long generationId)
|
{
|
final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
|
final LocalEvaluation eval = new LocalEvaluation();
|
boolean emptyState = true;
|
|
for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
|
{
|
final Integer rsId = entry.getKey();
|
final ReplicationServerInfo rsInfo = entry.getValue();
|
if (rsInfo.getGenerationId() == generationId)
|
{
|
eval.accept(rsId, rsInfo);
|
if (!rsInfo.serverState.isEmpty())
|
emptyState = false;
|
}
|
else if (rsInfo.getGenerationId() == -1)
|
{
|
eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId,
|
generationId, localServerId));
|
}
|
else
|
{
|
eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get(
|
rsId, rsInfo.getGenerationId(), localServerId, generationId));
|
}
|
}
|
|
if (emptyState)
|
{
|
// If the RS with a generationId have all an empty state,
|
// then the 'empty'(genId=-1) RSes are also candidate
|
for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
|
{
|
ReplicationServerInfo rsInfo = entry.getValue();
|
if (rsInfo.getGenerationId() == -1)
|
{
|
// will undo the reject of previously rejected RSs
|
eval.accept(entry.getKey(), rsInfo);
|
}
|
}
|
}
|
|
return evals.keepBest(eval);
|
}
|
|
/**
|
* Creates a new list that contains only replication servers that have the
|
* latest changes from the passed DS, from a passed replication server list.
|
* @param evals The evaluation object
|
* @param localState The state of the local DS
|
* @param localServerId The server id to consider for the changes
|
*/
|
private static void filterServersWithAllLocalDSChanges(
|
RSEvaluations evals, ServerState localState, int localServerId)
|
{
|
// Extract the CSN of the latest change generated by the local server
|
final CSN localCSN = getCSN(localState, localServerId);
|
|
/**
|
* Find replication servers that are up to date (or more up to date than us,
|
* if for instance we failed and restarted, having sent some changes to the
|
* RS but without having time to store our own state) regarding our own
|
* server id. If some servers are more up to date, prefer this list but take
|
* only the latest CSN.
|
*/
|
final LocalEvaluation mostUpToDateEval = new LocalEvaluation();
|
boolean foundRSMoreUpToDateThanLocalDS = false;
|
CSN latestRsCSN = null;
|
for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
|
{
|
final Integer rsId = entry.getKey();
|
final ReplicationServerInfo rsInfo = entry.getValue();
|
final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId);
|
|
// Has this replication server the latest local change ?
|
if (rsCSN.isOlderThan(localCSN))
|
{
|
mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get(
|
rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
|
}
|
else if (rsCSN.equals(localCSN))
|
{
|
// This replication server has exactly the latest change from the
|
// local server
|
if (!foundRSMoreUpToDateThanLocalDS)
|
{
|
mostUpToDateEval.accept(rsId, rsInfo);
|
}
|
else
|
{
|
mostUpToDateEval.reject(rsInfo,
|
NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
|
rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
|
}
|
}
|
else if (rsCSN.isNewerThan(localCSN))
|
{
|
// This replication server is even more up to date than the local server
|
if (latestRsCSN == null)
|
{
|
foundRSMoreUpToDateThanLocalDS = true;
|
// all previous results are now outdated, reject them all
|
rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
|
localCSN);
|
// Initialize the latest CSN
|
latestRsCSN = rsCSN;
|
}
|
|
if (rsCSN.equals(latestRsCSN))
|
{
|
mostUpToDateEval.accept(rsId, rsInfo);
|
}
|
else if (rsCSN.isNewerThan(latestRsCSN))
|
{
|
// This RS is even more up to date, reject all previously accepted RSs
|
// and store this new RS
|
rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
|
localCSN);
|
mostUpToDateEval.accept(rsId, rsInfo);
|
latestRsCSN = rsCSN;
|
}
|
else
|
{
|
mostUpToDateEval.reject(rsInfo,
|
NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
|
rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
|
}
|
}
|
}
|
evals.keepBest(mostUpToDateEval);
|
}
|
|
private static CSN getCSN(ServerState localState, int localServerId)
|
{
|
final CSN csn = localState.getCSN(localServerId);
|
if (csn != null)
|
{
|
return csn;
|
}
|
return new CSN(0, 0, localServerId);
|
}
|
|
private static void rejectAllWithRSIsLaterThanBestRS(
|
final LocalEvaluation eval, int localServerId, CSN localCSN)
|
{
|
for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
|
{
|
final String rsCSN =
|
getCSN(rsInfo.getServerState(), localServerId).toStringUI();
|
final Message reason =
|
NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
|
rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI());
|
eval.reject(rsInfo, reason);
|
}
|
}
|
|
/**
|
* Creates a new list that contains only replication servers that are on the
|
* same host as the local DS, from a passed replication server list. This
|
* method will gives priority to any replication server which is in the same
|
* VM as this DS.
|
*
|
* @param evals The evaluation object
|
*/
|
private static void filterServersOnSameHost(RSEvaluations evals,
|
int localServerId)
|
{
|
/*
|
* Initially look for all servers on the same host. If we find one in the
|
* same VM, then narrow the search.
|
*/
|
boolean foundRSInSameVM = false;
|
final LocalEvaluation eval = new LocalEvaluation();
|
for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
|
{
|
final Integer rsId = entry.getKey();
|
final ReplicationServerInfo rsInfo = entry.getValue();
|
final HostPort hp = HostPort.valueOf(rsInfo.getServerURL());
|
if (hp.isLocalAddress())
|
{
|
if (isLocalReplicationServerPort(hp.getPort()))
|
{
|
if (!foundRSInSameVM)
|
{
|
// An RS in the same VM will always have priority.
|
// Narrow the search to only include servers in this VM.
|
rejectAllWithRSOnDifferentVMThanDS(eval, localServerId);
|
foundRSInSameVM = true;
|
}
|
eval.accept(rsId, rsInfo);
|
}
|
else if (!foundRSInSameVM)
|
{
|
// OK, accept RSs on the same machine because we have not found an RS
|
// in the same VM yet
|
eval.accept(rsId, rsInfo);
|
}
|
else
|
{
|
// Skip: we have found some RSs in the same VM, but this RS is not.
|
eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId,
|
localServerId));
|
}
|
}
|
else
|
{
|
eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId,
|
localServerId));
|
}
|
}
|
evals.keepBest(eval);
|
}
|
|
private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval,
|
int localServerId)
|
{
|
for (ReplicationServerInfo rsInfo : eval.getAccepted().values())
|
{
|
eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(
|
rsInfo.getServerId(), localServerId));
|
}
|
}
|
|
/**
|
* Computes the best replication server the local server should be connected
|
* to so that the load is correctly spread across the topology, following the
|
* weights guidance.
|
* Warning: This method is expected to be called with at least 2 servers in
|
* bestServers
|
* Note: this method is static for test purpose (access from unit tests)
|
* @param evals The evaluation object
|
* @param currentRsServerId The replication server the local server is
|
* currently connected to. -1 if the local server is not yet connected
|
* to any replication server.
|
* @param localServerId The server id of the local server. This is not used
|
* when it is not connected to a replication server
|
* (currentRsServerId = -1)
|
*/
|
public static void computeBestServerForWeight(RSEvaluations evals,
|
int currentRsServerId, int localServerId)
|
{
|
final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
|
/*
|
* - Compute the load goal of each RS, deducing it from the weights affected
|
* to them.
|
* - Compute the current load of each RS, deducing it from the DSs
|
* currently connected to them.
|
* - Compute the differences between the load goals and the current loads of
|
* the RSs.
|
*/
|
// Sum of the weights
|
int sumOfWeights = 0;
|
// Sum of the connected DSs
|
int sumOfConnectedDSs = 0;
|
for (ReplicationServerInfo rsInfo : bestServers.values())
|
{
|
sumOfWeights += rsInfo.getWeight();
|
sumOfConnectedDSs += rsInfo.getConnectedDSNumber();
|
}
|
|
// Distance (difference) of the current loads to the load goals of each RS:
|
// key:server id, value: distance
|
Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
|
// Precision for the operations (number of digits after the dot)
|
final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
|
for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
|
{
|
final Integer rsId = entry.getKey();
|
final ReplicationServerInfo rsInfo = entry.getValue();
|
|
// load goal = rs weight / sum of weights
|
BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide(
|
BigDecimal.valueOf(sumOfWeights), mathContext);
|
BigDecimal currentLoadBd = BigDecimal.ZERO;
|
if (sumOfConnectedDSs != 0)
|
{
|
// current load = number of connected DSs / total number of DSs
|
int connectedDSs = rsInfo.getConnectedDSNumber();
|
currentLoadBd = BigDecimal.valueOf(connectedDSs).divide(
|
BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
|
}
|
// load distance = load goal - current load
|
BigDecimal loadDistanceBd =
|
loadGoalBd.subtract(currentLoadBd, mathContext);
|
loadDistances.put(rsId, loadDistanceBd);
|
}
|
|
if (currentRsServerId == -1)
|
{
|
// The local server is not connected yet, find best server to connect to,
|
// taking the weights into account.
|
computeBestServerWhenNotConnected(evals, loadDistances, localServerId);
|
}
|
else
|
{
|
// The local server is currently connected to a RS, let's see if it must
|
// disconnect or not, taking the weights into account.
|
computeBestServerWhenConnected(evals, loadDistances, localServerId,
|
currentRsServerId, sumOfWeights, sumOfConnectedDSs);
|
}
|
}
|
|
private static void computeBestServerWhenNotConnected(RSEvaluations evals,
|
Map<Integer, BigDecimal> loadDistances, int localServerId)
|
{
|
final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
|
/*
|
* Find the server with the current highest distance to its load goal and
|
* choose it. Make an exception if every server is correctly balanced,
|
* that is every current load distances are equal to 0, in that case,
|
* choose the server with the highest weight
|
*/
|
int bestRsId = 0; // If all server equal, return the first one
|
float highestDistance = Float.NEGATIVE_INFINITY;
|
boolean allRsWithZeroDistance = true;
|
int highestWeightRsId = -1;
|
int highestWeight = -1;
|
for (Integer rsId : bestServers.keySet())
|
{
|
float loadDistance = loadDistances.get(rsId).floatValue();
|
if (loadDistance > highestDistance)
|
{
|
// This server is far more from its balance point
|
bestRsId = rsId;
|
highestDistance = loadDistance;
|
}
|
if (loadDistance != 0)
|
{
|
allRsWithZeroDistance = false;
|
}
|
int weight = bestServers.get(rsId).getWeight();
|
if (weight > highestWeight)
|
{
|
// This server has a higher weight
|
highestWeightRsId = rsId;
|
highestWeight = weight;
|
}
|
}
|
// All servers with a 0 distance ?
|
if (allRsWithZeroDistance)
|
{
|
// Choose server with the highest weight
|
bestRsId = highestWeightRsId;
|
}
|
evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId,
|
bestRsId));
|
}
|
|
private static void computeBestServerWhenConnected(RSEvaluations evals,
|
Map<Integer, BigDecimal> loadDistances, int localServerId,
|
int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
|
{
|
final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
|
final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
|
float currentLoadDistance =
|
loadDistances.get(currentRsServerId).floatValue();
|
if (currentLoadDistance < 0)
|
{
|
/*
|
Too much DSs connected to the current RS, compared with its load
|
goal:
|
Determine the potential number of DSs to disconnect from the current
|
RS and see if the local DS is part of them: the DSs that must
|
disconnect are those with the lowest server id.
|
Compute the sum of the distances of the load goals of the other RSs
|
*/
|
BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
|
for (Integer rsId : bestServers.keySet())
|
{
|
if (rsId != currentRsServerId)
|
{
|
sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
|
loadDistances.get(rsId), mathContext);
|
}
|
}
|
|
if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
|
{
|
/*
|
The average distance of the other RSs shows a lack of DSs.
|
Compute the number of DSs to disconnect from the current RS,
|
rounding to the nearest integer number. Do only this if there is
|
no risk of yoyo effect: when the exact balance cannot be
|
established due to the current number of DSs connected, do not
|
disconnect a DS. A simple example where the balance cannot be
|
reached is:
|
- RS1 has weight 1 and 2 DSs
|
- RS2 has weight 1 and 1 DS
|
=> disconnecting a DS from RS1 to reconnect it to RS2 would have no
|
sense as this would lead to the reverse situation. In that case,
|
the perfect balance cannot be reached and we must stick to the
|
current situation, otherwise the DS would keep move between the 2
|
RSs
|
*/
|
float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
|
multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
|
.floatValue();
|
int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
|
|
// Avoid yoyo effect
|
if (overloadingDSsNumber == 1)
|
{
|
// What would be the new load distance for the current RS if
|
// we disconnect some DSs ?
|
ReplicationServerInfo currentReplicationServerInfo =
|
bestServers.get(currentRsServerId);
|
|
int currentRsWeight = currentReplicationServerInfo.getWeight();
|
BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
|
BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
|
BigDecimal currentRsLoadGoalBd =
|
currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
|
BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
|
if (sumOfConnectedDSs != 0)
|
{
|
int connectedDSs = currentReplicationServerInfo.
|
getConnectedDSNumber();
|
BigDecimal potentialNewConnectedDSsBd =
|
BigDecimal.valueOf(connectedDSs - 1);
|
BigDecimal sumOfConnectedDSsBd =
|
BigDecimal.valueOf(sumOfConnectedDSs);
|
potentialCurrentRsNewLoadBd =
|
potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
|
mathContext);
|
}
|
BigDecimal potentialCurrentRsNewLoadDistanceBd =
|
currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
|
mathContext);
|
|
// What would be the new load distance for the other RSs ?
|
BigDecimal additionalDsLoadBd =
|
BigDecimal.ONE.divide(
|
BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
|
BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
|
sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
|
mathContext);
|
|
/*
|
Now compare both values: we must not disconnect the DS if this
|
is for going in a situation where the load distance of the other
|
RSs is the opposite of the future load distance of the local RS
|
or we would evaluate that we should disconnect just after being
|
arrived on the new RS. But we should disconnect if we reach the
|
perfect balance (both values are 0).
|
*/
|
MathContext roundMc = new MathContext(6, RoundingMode.DOWN);
|
BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
|
potentialCurrentRsNewLoadDistanceBd.round(roundMc);
|
BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
|
potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
|
|
if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
|
BigDecimal.ZERO) != 0)
|
&& (potentialCurrentRsNewLoadDistanceBdRounded.equals(
|
potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
|
{
|
// Avoid the yoyo effect, and keep the local DS connected to its
|
// current RS
|
evals.setBestRS(currentRsServerId,
|
NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId));
|
return;
|
}
|
}
|
|
ReplicationServerInfo currentRsInfo =
|
bestServers.get(currentRsServerId);
|
if (isServerOverloadingRS(localServerId, currentRsInfo,
|
overloadingDSsNumber))
|
{
|
// The local server is part of the DSs to disconnect
|
evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get(
|
localServerId, currentRsServerId));
|
}
|
else
|
{
|
// The local server is not part of the servers to disconnect from the
|
// current RS.
|
evals.setBestRS(currentRsServerId,
|
NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId,
|
currentRsServerId));
|
}
|
} else {
|
// The average distance of the other RSs does not show a lack of DSs:
|
// no need to disconnect any DS from the current RS.
|
evals.setBestRS(currentRsServerId,
|
NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId,
|
currentRsServerId));
|
}
|
} else {
|
// The RS load goal is reached or there are not enough DSs connected to
|
// it to reach it: do not disconnect from this RS and return rsInfo for
|
// this RS
|
evals.setBestRS(currentRsServerId,
|
NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId,
|
currentRsServerId));
|
}
|
}
|
|
/**
|
* Returns whether the local DS is overloading the RS.
|
* <p>
|
* There are an "overloadingDSsNumber" of DS overloading the RS. The list of
|
* DSs connected to this RS is ordered by serverId to use a consistent
|
* ordering across all nodes in the topology. The serverIds which index in the
|
* List are lower than "overloadingDSsNumber" will be evicted first.
|
* <p>
|
* This ordering is unfair since nodes with the lower serverIds will be
|
* evicted more often than nodes with higher serverIds. However, it is a
|
* consistent and reliable ordering applicable anywhere in the topology.
|
*/
|
private static boolean isServerOverloadingRS(int localServerId,
|
ReplicationServerInfo currentRsInfo, int overloadingDSsNumber)
|
{
|
List<Integer> serversConnectedToCurrentRS =
|
new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
|
Collections.sort(serversConnectedToCurrentRS);
|
|
final int idx = serversConnectedToCurrentRS.indexOf(localServerId);
|
return idx != -1 && idx < overloadingDSsNumber;
|
}
|
|
/**
|
* Start the heartbeat monitor thread.
|
*/
|
private void startRSHeartBeatMonitoring()
|
{
|
// Start a heartbeat monitor thread.
|
final long heartbeatInterval = config.getHeartbeatInterval();
|
if (heartbeatInterval > 0)
|
{
|
heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(),
|
getBaseDN().toNormalizedString(), session, heartbeatInterval);
|
heartbeatMonitor.start();
|
}
|
}
|
|
/**
|
* Stop the heartbeat monitor thread.
|
*/
|
synchronized void stopRSHeartBeatMonitoring()
|
{
|
if (heartbeatMonitor != null)
|
{
|
heartbeatMonitor.shutdown();
|
heartbeatMonitor = null;
|
}
|
}
|
|
/**
|
* restart the ReplicationBroker.
|
* @param infiniteTry the socket which failed
|
*/
|
public void reStart(boolean infiniteTry)
|
{
|
reStart(session, infiniteTry);
|
}
|
|
/**
|
* Restart the ReplicationServer broker after a failure.
|
*
|
* @param failingSession the socket which failed
|
* @param infiniteTry the socket which failed
|
*/
|
public void reStart(Session failingSession, boolean infiniteTry)
|
{
|
if (failingSession != null)
|
{
|
failingSession.close();
|
numLostConnections++;
|
}
|
|
if (failingSession == session)
|
{
|
connected = false;
|
rsGroupId = -1;
|
rsServerId = -1;
|
rsServerUrl = null;
|
setSession(null);
|
}
|
|
while (true)
|
{
|
// Synchronize inside the loop in order to allow shutdown.
|
synchronized (startStopLock)
|
{
|
if (connected || shutdown)
|
{
|
break;
|
}
|
|
try
|
{
|
connect();
|
}
|
catch (Exception e)
|
{
|
MessageBuilder mb = new MessageBuilder();
|
mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
|
getBaseDN().toNormalizedString(), e.getLocalizedMessage()));
|
mb.append(stackTraceToSingleLineString(e));
|
logError(mb.toMessage());
|
}
|
|
if (connected || !infiniteTry)
|
{
|
break;
|
}
|
|
}
|
try
|
{
|
Thread.sleep(500);
|
}
|
catch (InterruptedException e)
|
{
|
// ignore
|
}
|
}
|
|
if (debugEnabled())
|
{
|
debugInfo("end restart : connected=" + connected + " with RS("
|
+ getRsServerId() + ") genId=" + this.generationID);
|
}
|
}
|
|
/**
|
* Publish a message to the other servers.
|
* @param msg the message to publish
|
*/
|
public void publish(ReplicationMsg msg)
|
{
|
publish(msg, false, true);
|
}
|
|
/**
|
* Publish a message to the other servers.
|
* @param msg The message to publish.
|
* @param retryOnFailure Whether reconnect should automatically be done.
|
* @return Whether publish succeeded.
|
*/
|
public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
|
{
|
return publish(msg, false, retryOnFailure);
|
}
|
|
/**
|
* Publish a recovery message to the other servers.
|
* @param msg the message to publish
|
*/
|
public void publishRecovery(ReplicationMsg msg)
|
{
|
publish(msg, true, true);
|
}
|
|
/**
|
* Publish a message to the other servers.
|
* @param msg the message to publish
|
* @param recoveryMsg the message is a recovery Message
|
* @param retryOnFailure whether retry should be done on failure
|
* @return whether the message was successfully sent.
|
*/
|
private boolean publish(ReplicationMsg msg, boolean recoveryMsg,
|
boolean retryOnFailure)
|
{
|
boolean done = false;
|
|
while (!done && !shutdown)
|
{
|
if (connectionError)
|
{
|
/*
|
It was not possible to connect to any replication server.
|
Since the operation was already processed, we have no other
|
choice than to return without sending the ReplicationMsg
|
and relying on the resend procedure of the connect phase to
|
fix the problem when we finally connect.
|
*/
|
|
if (debugEnabled())
|
{
|
debugInfo("publish(): Publishing a message is not possible due to"
|
+ " existing connection error.");
|
}
|
|
return false;
|
}
|
|
try
|
{
|
/*
|
save the session at the time when we acquire the
|
sendwindow credit so that we can make sure later
|
that the session did not change in between.
|
This is necessary to make sure that we don't publish a message
|
on a session with a credit that was acquired from a previous
|
session.
|
*/
|
Session currentSession;
|
Semaphore currentWindowSemaphore;
|
synchronized (connectPhaseLock)
|
{
|
currentSession = session;
|
currentWindowSemaphore = sendWindow;
|
}
|
|
/*
|
If the Replication domain has decided that there is a need to
|
recover some changes then it is not allowed to send this
|
change but it will be the responsibility of the recovery thread to
|
do it.
|
*/
|
if (!recoveryMsg & connectRequiresRecovery)
|
{
|
return false;
|
}
|
|
boolean credit;
|
if (msg instanceof UpdateMsg)
|
{
|
/*
|
Acquiring the window credit must be done outside of the
|
connectPhaseLock because it can be blocking and we don't
|
want to hold off reconnection in case the connection dropped.
|
*/
|
credit =
|
currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
|
} else
|
{
|
credit = true;
|
}
|
|
if (credit)
|
{
|
synchronized (connectPhaseLock)
|
{
|
/*
|
session may have been set to null in the connection phase
|
when restarting the broker for example.
|
Check the session. If it has changed, some disconnection or
|
reconnection happened and we need to restart from scratch.
|
*/
|
final Session localSession = session;
|
if (localSession != null && session == currentSession)
|
{
|
localSession.publish(msg);
|
done = true;
|
}
|
}
|
}
|
if (!credit && currentWindowSemaphore.availablePermits() == 0)
|
{
|
synchronized (connectPhaseLock)
|
{
|
/*
|
the window is still closed.
|
Send a WindowProbeMsg message to wake up the receiver in case the
|
window update message was lost somehow...
|
then loop to check again if connection was closed.
|
*/
|
Session localSession = session;
|
if (localSession != null)
|
{
|
localSession.publish(new WindowProbeMsg());
|
}
|
}
|
}
|
} catch (IOException e)
|
{
|
if (!retryOnFailure)
|
return false;
|
|
// The receive threads should handle reconnection or
|
// mark this broker in error. Just retry.
|
synchronized (connectPhaseLock)
|
{
|
try
|
{
|
connectPhaseLock.wait(100);
|
} catch (InterruptedException e1)
|
{
|
// ignore
|
if (debugEnabled())
|
{
|
debugInfo("publish(): Interrupted exception raised : "
|
+ e.getLocalizedMessage());
|
}
|
}
|
}
|
} catch (InterruptedException e)
|
{
|
// just loop.
|
if (debugEnabled())
|
{
|
debugInfo("publish(): Interrupted exception raised."
|
+ e.getLocalizedMessage());
|
}
|
}
|
}
|
return true;
|
}
|
|
/**
|
* Receive a message.
|
* This method is not thread-safe and should either always be
|
* called in a single thread or protected by a locking mechanism
|
* before being called. This is a wrapper to the method with a boolean version
|
* so that we do not have to modify existing tests.
|
*
|
* @return the received message
|
* @throws SocketTimeoutException if the timeout set by setSoTimeout
|
* has expired
|
*/
|
public ReplicationMsg receive() throws SocketTimeoutException
|
{
|
return receive(false, true, false);
|
}
|
|
/**
|
* Receive a message.
|
* This method is not thread-safe and should either always be
|
* called in a single thread or protected by a locking mechanism
|
* before being called.
|
*
|
* @param reconnectToTheBestRS Whether broker will automatically switch
|
* to the best suitable RS.
|
* @param reconnectOnFailure Whether broker will automatically reconnect
|
* on failure.
|
* @param returnOnTopoChange Whether broker should return TopologyMsg
|
* received.
|
* @return the received message
|
*
|
* @throws SocketTimeoutException if the timeout set by setSoTimeout
|
* has expired
|
*/
|
public ReplicationMsg receive(boolean reconnectToTheBestRS,
|
boolean reconnectOnFailure, boolean returnOnTopoChange)
|
throws SocketTimeoutException
|
{
|
while (!shutdown)
|
{
|
if (reconnectOnFailure && !connected)
|
{
|
// infinite try to reconnect
|
reStart(null, true);
|
}
|
|
// Save session information for later in case we need it for log messages
|
// after the session has been closed and/or failed.
|
final Session localSession = session;
|
if (localSession == null)
|
{
|
// Must be shutting down.
|
break;
|
}
|
|
final int serverId = getServerId();
|
final DN baseDN = getBaseDN();
|
final int previousRsServerID = rsServerId;
|
try
|
{
|
ReplicationMsg msg = localSession.receive();
|
if (msg instanceof UpdateMsg)
|
{
|
synchronized (this)
|
{
|
rcvWindow--;
|
}
|
}
|
if (msg instanceof WindowMsg)
|
{
|
WindowMsg windowMsg = (WindowMsg) msg;
|
sendWindow.release(windowMsg.getNumAck());
|
}
|
else if (msg instanceof TopologyMsg)
|
{
|
TopologyMsg topoMsg = (TopologyMsg) msg;
|
receiveTopo(topoMsg);
|
if (reconnectToTheBestRS)
|
{
|
// Reset wait time before next computation of best server
|
mustRunBestServerCheckingAlgorithm = 0;
|
}
|
|
// Caller wants to check what's changed
|
if (returnOnTopoChange)
|
return msg;
|
|
}
|
else if (msg instanceof StopMsg)
|
{
|
// RS performs a proper disconnection
|
Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
|
previousRsServerID, localSession.getReadableRemoteAddress(),
|
serverId, baseDN.toNormalizedString());
|
logError(message);
|
|
// Try to find a suitable RS
|
reStart(localSession, true);
|
}
|
else if (msg instanceof MonitorMsg)
|
{
|
// This is the response to a MonitorRequest that was sent earlier or
|
// the regular message of the monitoring publisher of the RS.
|
MonitorMsg monitorMsg = (MonitorMsg) msg;
|
|
// Extract and store replicas ServerStates
|
final Map<Integer, ServerState> newReplicaStates =
|
new HashMap<Integer, ServerState>();
|
for (int srvId : toIterable(monitorMsg.ldapIterator()))
|
{
|
newReplicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
|
}
|
replicaStates = newReplicaStates;
|
|
// Notify the sender that the response was received.
|
synchronized (monitorResponse)
|
{
|
monitorResponse.set(true);
|
monitorResponse.notify();
|
}
|
|
// Update the replication servers ServerStates with new received info
|
for (int srvId : toIterable(monitorMsg.rsIterator()))
|
{
|
ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
|
if (rsInfo != null)
|
{
|
rsInfo.update(monitorMsg.getRSServerState(srvId));
|
}
|
}
|
|
/*
|
Now if it is allowed, compute the best replication server to see if
|
it is still the one we are currently connected to. If not,
|
disconnect properly and let the connection algorithm re-connect to
|
best replication server
|
*/
|
if (reconnectToTheBestRS)
|
{
|
mustRunBestServerCheckingAlgorithm++;
|
if (mustRunBestServerCheckingAlgorithm == 2)
|
{
|
// Stable topology (no topo msg since few seconds): proceed with
|
// best server checking.
|
final RSEvaluations evals =
|
computeBestReplicationServer(false, previousRsServerID, state,
|
replicationServerInfos, serverId, getGroupId(), generationID);
|
final ReplicationServerInfo bestServerInfo = evals.getBestRS();
|
if (previousRsServerID != -1
|
&& (bestServerInfo == null
|
|| bestServerInfo.getServerId() != previousRsServerID))
|
{
|
// The best replication server is no more the one we are
|
// currently using. Disconnect properly then reconnect.
|
Message message;
|
if (bestServerInfo == null)
|
{
|
message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
|
serverId, previousRsServerID,
|
localSession.getReadableRemoteAddress(),
|
baseDN.toNormalizedString());
|
}
|
else
|
{
|
final int bestRsServerId = bestServerInfo.getServerId();
|
message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
|
serverId, previousRsServerID,
|
localSession.getReadableRemoteAddress(),
|
bestRsServerId,
|
baseDN.toNormalizedString(),
|
evals.getEvaluation(previousRsServerID).toString(),
|
evals.getEvaluation(bestRsServerId).toString());
|
}
|
logError(message);
|
if (debugEnabled())
|
debugInfo("best replication servers evaluation results: "
|
+ evals);
|
reStart(true);
|
}
|
|
// Reset wait time before next computation of best server
|
mustRunBestServerCheckingAlgorithm = 0;
|
}
|
}
|
}
|
else
|
{
|
return msg;
|
}
|
}
|
catch (SocketTimeoutException e)
|
{
|
throw e;
|
}
|
catch (Exception e)
|
{
|
if (debugEnabled())
|
{
|
TRACER.debugCaught(DebugLogLevel.ERROR, e);
|
}
|
|
if (!shutdown)
|
{
|
final Session tmpSession = session;
|
if (tmpSession == null || !tmpSession.closeInitiated())
|
{
|
// We did not initiate the close on our side, log an error message.
|
Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
|
serverId, baseDN.toNormalizedString(), previousRsServerID,
|
localSession.getReadableRemoteAddress());
|
logError(message);
|
}
|
|
if (reconnectOnFailure)
|
{
|
reStart(localSession, true);
|
}
|
else
|
{
|
break; // does not seem necessary to explicitly disconnect ..
|
}
|
}
|
}
|
} // while !shutdown
|
return null;
|
}
|
|
/**
|
* Gets the States of all the Replicas currently in the Topology. When this
|
* method is called, a Monitoring message will be sent to the Replication
|
* Server to which this domain is currently connected so that it computes a
|
* table containing information about all Directory Servers in the topology.
|
* This Computation involves communications will all the servers currently
|
* connected and
|
*
|
* @return The States of all Replicas in the topology (except us)
|
*/
|
public Map<Integer, ServerState> getReplicaStates()
|
{
|
monitorResponse.set(false);
|
|
// publish Monitor Request Message to the Replication Server
|
publish(new MonitorRequestMsg(getServerId(), getRsServerId()));
|
|
// wait for Response up to 10 seconds.
|
try
|
{
|
synchronized (monitorResponse)
|
{
|
if (!monitorResponse.get())
|
{
|
monitorResponse.wait(10000);
|
}
|
}
|
} catch (InterruptedException e)
|
{
|
Thread.currentThread().interrupt();
|
}
|
return replicaStates;
|
}
|
|
/**
|
* This method allows to do the necessary computing for the window
|
* management after treatment by the worker threads.
|
*
|
* This should be called once the replay thread have done their job
|
* and the window can be open again.
|
*/
|
public synchronized void updateWindowAfterReplay()
|
{
|
try
|
{
|
updateDoneCount++;
|
final Session localSession = session;
|
if (updateDoneCount >= halfRcvWindow && localSession != null)
|
{
|
localSession.publish(new WindowMsg(updateDoneCount));
|
rcvWindow += updateDoneCount;
|
updateDoneCount = 0;
|
}
|
} catch (IOException e)
|
{
|
// Any error on the socket will be handled by the thread calling receive()
|
// just ignore.
|
}
|
}
|
|
/**
|
* stop the server.
|
*/
|
public void stop()
|
{
|
if (debugEnabled())
|
debugInfo("is stopping and will close the connection to"
|
+ " replication server " + rsServerId);
|
|
synchronized (startStopLock)
|
{
|
shutdown = true;
|
connected = false;
|
stopRSHeartBeatMonitoring();
|
stopChangeTimeHeartBeatPublishing();
|
replicationServer = "stopped";
|
rsGroupId = -1;
|
rsServerId = -1;
|
rsServerUrl = null;
|
setSession(null);
|
deregisterReplicationMonitor();
|
}
|
}
|
|
/**
|
* Set a timeout value.
|
* With this option set to a non-zero value, calls to the receive() method
|
* block for only this amount of time after which a
|
* java.net.SocketTimeoutException is raised.
|
* The Broker is valid and usable even after such an Exception is raised.
|
*
|
* @param timeout the specified timeout, in milliseconds.
|
* @throws SocketException if there is an error in the underlying protocol,
|
* such as a TCP error.
|
*/
|
public void setSoTimeout(int timeout) throws SocketException
|
{
|
this.timeout = timeout;
|
final Session localSession = session;
|
if (localSession != null)
|
{
|
localSession.setSoTimeout(timeout);
|
}
|
}
|
|
/**
|
* Get the name of the replicationServer to which this broker is currently
|
* connected.
|
*
|
* @return the name of the replicationServer to which this domain
|
* is currently connected.
|
*/
|
public String getReplicationServer()
|
{
|
return replicationServer;
|
}
|
|
/**
|
* Get the maximum receive window size.
|
*
|
* @return The maximum receive window size.
|
*/
|
public int getMaxRcvWindow()
|
{
|
return config.getWindowSize();
|
}
|
|
/**
|
* Get the current receive window size.
|
*
|
* @return The current receive window size.
|
*/
|
public int getCurrentRcvWindow()
|
{
|
return rcvWindow;
|
}
|
|
/**
|
* Get the maximum send window size.
|
*
|
* @return The maximum send window size.
|
*/
|
public int getMaxSendWindow()
|
{
|
return maxSendWindow;
|
}
|
|
/**
|
* Get the current send window size.
|
*
|
* @return The current send window size.
|
*/
|
public int getCurrentSendWindow()
|
{
|
if (connected)
|
{
|
return sendWindow.availablePermits();
|
}
|
return 0;
|
}
|
|
/**
|
* Get the number of times the connection was lost.
|
* @return The number of times the connection was lost.
|
*/
|
public int getNumLostConnections()
|
{
|
return numLostConnections;
|
}
|
|
/**
|
* Change some configuration parameters.
|
*
|
* @param newConfig The new config to use.
|
* @return A boolean indicating if the changes
|
* requires to restart the service.
|
*/
|
public boolean changeConfig(ReplicationDomainCfg newConfig)
|
{
|
// These parameters needs to be renegotiated with the ReplicationServer
|
// so if they have changed, that requires restarting the session with
|
// the ReplicationServer.
|
// A new session is necessary only when information regarding
|
// the connection is modified
|
boolean needToRestartSession =
|
!newConfig.getReplicationServer().equals(config.getReplicationServer())
|
|| newConfig.getWindowSize() != config.getWindowSize()
|
|| newConfig.getHeartbeatInterval() != config.getHeartbeatInterval()
|
|| newConfig.getGroupId() != config.getGroupId();
|
|
this.config = newConfig;
|
this.rcvWindow = newConfig.getWindowSize();
|
this.halfRcvWindow = this.rcvWindow / 2;
|
|
return needToRestartSession;
|
}
|
|
/**
|
* Get the version of the replication protocol.
|
* @return The version of the replication protocol.
|
*/
|
public short getProtocolVersion()
|
{
|
return protocolVersion;
|
}
|
|
/**
|
* Check if the broker is connected to a ReplicationServer and therefore
|
* ready to received and send Replication Messages.
|
*
|
* @return true if the server is connected, false if not.
|
*/
|
public boolean isConnected()
|
{
|
return connected;
|
}
|
|
/**
|
* Determine whether the connection to the replication server is encrypted.
|
* @return true if the connection is encrypted, false otherwise.
|
*/
|
public boolean isSessionEncrypted()
|
{
|
final Session tmp = session;
|
return tmp != null ? tmp.isEncrypted() : false;
|
}
|
|
/**
|
* Signals the RS we just entered a new status.
|
* @param newStatus The status the local DS just entered
|
*/
|
public void signalStatusChange(ServerStatus newStatus)
|
{
|
try
|
{
|
ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS,
|
newStatus);
|
session.publish(csMsg);
|
} catch (IOException ex)
|
{
|
Message message = ERR_EXCEPTION_SENDING_CS.get(
|
getBaseDN().toNormalizedString(),
|
Integer.toString(getServerId()),
|
ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex));
|
logError(message);
|
}
|
}
|
|
/**
|
* Gets the info for DSs in the topology (except us).
|
* @return The info for DSs in the topology (except us)
|
*/
|
public List<DSInfo> getDsList()
|
{
|
return dsList;
|
}
|
|
/**
|
* Gets the info for RSs in the topology (except the one we are connected
|
* to).
|
* @return The info for RSs in the topology (except the one we are connected
|
* to)
|
*/
|
public List<RSInfo> getRsList()
|
{
|
final List<RSInfo> result = new ArrayList<RSInfo>();
|
for (ReplicationServerInfo rsInfo : replicationServerInfos.values())
|
{
|
result.add(rsInfo.toRSInfo());
|
}
|
return result;
|
}
|
|
/**
|
* Computes the list of DSs connected to a particular RS.
|
* @param rsId The RS id of the server one wants to know the connected DSs
|
* @param dsList The list of DSinfo from which to compute things
|
* @return The list of connected DSs to the server rsId
|
*/
|
private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList)
|
{
|
List<Integer> connectedDSs = new ArrayList<Integer>();
|
|
if (rsServerId == rsId)
|
{
|
/*
|
If we are computing connected DSs for the RS we are connected
|
to, we should count the local DS as the DSInfo of the local DS is not
|
sent by the replication server in the topology message. We must count
|
ourselves as a connected server.
|
*/
|
connectedDSs.add(getServerId());
|
}
|
|
for (DSInfo dsInfo : dsList)
|
{
|
if (dsInfo.getRsId() == rsId)
|
connectedDSs.add(dsInfo.getDsId());
|
}
|
|
return connectedDSs;
|
}
|
|
/**
|
* Processes an incoming TopologyMsg.
|
* Updates the structures for the local view of the topology.
|
*
|
* @param topoMsg The topology information received from RS.
|
*/
|
public void receiveTopo(TopologyMsg topoMsg)
|
{
|
if (debugEnabled())
|
debugInfo("receive TopologyMsg=" + topoMsg);
|
|
// Store new DS list
|
dsList = topoMsg.getDsList();
|
|
// Update replication server info list with the received topology
|
// information
|
final Set<Integer> rssToKeep = new HashSet<Integer>();
|
for (RSInfo rsInfo : topoMsg.getRsList())
|
{
|
int rsId = rsInfo.getId();
|
rssToKeep.add(rsId); // Mark this server as still existing
|
List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
|
ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
|
if (rsInfo2 == null)
|
{
|
// New replication server, create info for it add it to the list
|
rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
|
// Set the locally configured flag for this new RS only if it is
|
// configured
|
updateRSInfoLocallyConfiguredStatus(rsInfo2);
|
replicationServerInfos.put(rsId, rsInfo2);
|
} else
|
{
|
// Update the existing info for the replication server
|
rsInfo2.update(rsInfo, connectedDSs);
|
}
|
}
|
|
/**
|
* Now remove any replication server that may have disappeared from the
|
* topology.
|
*/
|
Iterator<Integer> rsInfoIt = replicationServerInfos.keySet().iterator();
|
while (rsInfoIt.hasNext())
|
{
|
final Integer rsId = rsInfoIt.next();
|
if (!rssToKeep.contains(rsId))
|
{
|
// This replication server has quit the topology, remove it from the
|
// list
|
rsInfoIt.remove();
|
}
|
}
|
if (domain != null)
|
{
|
for (DSInfo info : dsList)
|
{
|
domain.setEclIncludes(info.getDsId(), info.getEclIncludes(),
|
info.getEclIncludesForDeletes());
|
}
|
}
|
}
|
|
/**
|
* Check if the broker could not find any Replication Server and therefore
|
* connection attempt failed.
|
*
|
* @return true if the server could not connect to any Replication Server.
|
*/
|
public boolean hasConnectionError()
|
{
|
return connectionError;
|
}
|
|
/**
|
* Starts publishing to the RS the current timestamp used in this server.
|
*/
|
private void startChangeTimeHeartBeatPublishing()
|
{
|
// Start a CSN heartbeat thread.
|
long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval();
|
if (changeTimeHeartbeatInterval > 0)
|
{
|
final Session localSession = session;
|
final String threadName = "Replica DS(" + getServerId()
|
+ ") change time heartbeat publisher for domain \""
|
+ getBaseDN() + "\" to RS(" + getRsServerId()
|
+ ") at " + localSession.getReadableRemoteAddress();
|
|
ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
|
threadName, localSession, changeTimeHeartbeatInterval, getServerId());
|
ctHeartbeatPublisherThread.start();
|
}
|
else
|
{
|
if (debugEnabled())
|
debugInfo("is not configured to send CSN heartbeat interval");
|
}
|
}
|
|
/**
|
* Stops publishing to the RS the current timestamp used in this server.
|
*/
|
private synchronized void stopChangeTimeHeartBeatPublishing()
|
{
|
if (ctHeartbeatPublisherThread != null)
|
{
|
ctHeartbeatPublisherThread.shutdown();
|
ctHeartbeatPublisherThread = null;
|
}
|
}
|
|
/**
|
* Set the connectRequiresRecovery to the provided value.
|
* This flag is used to indicate if a recovery of Update is necessary
|
* after a reconnection to a RS.
|
* It is the responsibility of the ReplicationDomain to set it during the
|
* sessionInitiated phase.
|
*
|
* @param b the new value of the connectRequiresRecovery.
|
*/
|
public void setRecoveryRequired(boolean b)
|
{
|
connectRequiresRecovery = b;
|
}
|
|
/**
|
* Returns whether the broker is shutting down.
|
* @return whether the broker is shutting down.
|
*/
|
public boolean shuttingDown()
|
{
|
return shutdown;
|
}
|
|
/**
|
* Returns the local address of this replication domain, or the empty string
|
* if it is not yet connected.
|
*
|
* @return The local address.
|
*/
|
String getLocalUrl()
|
{
|
final Session tmp = session;
|
return tmp != null ? tmp.getLocalUrl() : "";
|
}
|
|
/**
|
* Returns the replication monitor instance name associated with this broker.
|
*
|
* @return The replication monitor instance name.
|
*/
|
String getReplicationMonitorInstanceName()
|
{
|
// Only invoked by replication domain so always non-null.
|
return monitor.getMonitorInstanceName();
|
}
|
|
private void setSession(final Session newSession)
|
{
|
// De-register the monitor with the old name.
|
deregisterReplicationMonitor();
|
|
final Session oldSession = session;
|
if (oldSession != null)
|
{
|
oldSession.close();
|
}
|
session = newSession;
|
|
// Re-register the monitor with the new name.
|
registerReplicationMonitor();
|
}
|
|
private void registerReplicationMonitor()
|
{
|
/*
|
* The monitor should not be registered if this is a unit test because the
|
* replication domain is null.
|
*/
|
if (monitor != null)
|
{
|
DirectoryServer.registerMonitorProvider(monitor);
|
}
|
}
|
|
private void deregisterReplicationMonitor()
|
{
|
/*
|
* The monitor should not be deregistered if this is a unit test because the
|
* replication domain is null.
|
*/
|
if (monitor != null)
|
{
|
DirectoryServer.deregisterMonitorProvider(monitor);
|
}
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public String toString()
|
{
|
final StringBuilder sb = new StringBuilder();
|
sb.append(getClass().getSimpleName())
|
.append(" \"").append(getBaseDN()).append(" ")
|
.append(getServerId()).append("\",")
|
.append(" groupId=").append(getGroupId())
|
.append(", genId=").append(generationID)
|
.append(", connected=").append(connected).append(", ");
|
if (rsServerId == -1)
|
{
|
sb.append("no RS");
|
}
|
else
|
{
|
sb.append("bestRS(serverId=").append(rsServerId)
|
.append(", serverUrl=").append(rsServerUrl)
|
.append(", groupId=").append(rsGroupId)
|
.append(")");
|
}
|
return sb.toString();
|
}
|
|
private void debugInfo(String message)
|
{
|
TRACER.debugInfo(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
|
+ " and serverId=" + getServerId() + " " + message);
|
}
|
}
|