From e13d1429c75364a349dee5d7f8703593fa0adf4f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 19 Dec 2013 16:31:41 +0000
Subject: [PATCH] Second step in simplifying the ReplicationBroker class.
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 628 +++++++++++++++++++++++++++-----------------------------
1 files changed, 301 insertions(+), 327 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index adc5046..3e464a0 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -71,44 +71,43 @@
/**
* Immutable class containing information about whether the broker is
* connected to an RS and data associated to this connected RS.
- * <p>
- * Mutable methods return a new version of this object copying the data that
- * did not change.
*/
// @Immutable
private static final class ConnectedRS
{
- private final String replicationServer;
+ private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
+ NO_CONNECTED_SERVER);
+
/** The info of the RS we are connected to. */
private final ReplicationServerInfo rsInfo;
- private final boolean connected;
+ private final Session session;
+ private final String replicationServer;
- private ConnectedRS(boolean connected, ReplicationServerInfo rsInfo,
- String replicationServer)
+ private ConnectedRS(String replicationServer)
{
- this.connected = connected;
- this.rsInfo = rsInfo;
+ this.rsInfo = null;
+ this.session = null;
this.replicationServer = replicationServer;
}
+ private ConnectedRS(ReplicationServerInfo rsInfo, Session session)
+ {
+ this.rsInfo = rsInfo;
+ this.session = session;
+ this.replicationServer = session != null ?
+ session.getReadableRemoteAddress()
+ : NO_CONNECTED_SERVER;
+ }
+
private static ConnectedRS stopped()
{
- return new ConnectedRS(false, null, "stopped");
+ return new ConnectedRS("stopped");
}
private static ConnectedRS noConnectedRS()
{
- return new ConnectedRS(false, null, NO_CONNECTED_SERVER);
- }
-
- /**
- * Returns a new version of the current object with the connected status set
- * to true.
- */
- private ConnectedRS setConnected()
- {
- return new ConnectedRS(true, rsInfo, replicationServer);
+ return NO_CONNECTED_RS;
}
public int getServerId()
@@ -121,6 +120,11 @@
return rsInfo != null ? rsInfo.getGroupId() : -1;
}
+ private boolean isConnected()
+ {
+ return session != null;
+ }
+
/** {@inheritDoc} */
@Override
public String toString()
@@ -132,19 +136,20 @@
public void toString(StringBuilder sb)
{
- sb.append("connected=").append(connected).append(", ");
- if (rsInfo == null) // this is a null object
+ sb.append("connected=").append(isConnected()).append(", ");
+ if (!isConnected())
{
- sb.append("no connected RS");
+ sb.append("no connectedRS");
}
else
{
- sb.append("connected RS(serverId=").append(rsInfo.getServerId())
+ sb.append("connectedRS(serverId=").append(rsInfo.getServerId())
.append(", serverUrl=").append(rsInfo.getServerURL())
.append(", groupId=").append(rsInfo.getGroupId())
.append(")");
}
}
+
}
/**
@@ -158,18 +163,27 @@
* String reported under CSN=monitor when there is no connected RS.
*/
public final static String NO_CONNECTED_SERVER = "Not connected";
- private volatile Session session;
private final ServerState state;
private Semaphore sendWindow;
private int maxSendWindow;
private int rcvWindow = 100;
private int halfRcvWindow = rcvWindow / 2;
private int timeout = 0;
- private short protocolVersion;
private ReplSessionSecurity replSessionSecurity;
+ /**
+ * The RS this DS is currently connected to.
+ * <p>
+ * Always use {@link #setConnectedRS(ConnectedRS)} to set a new
+ * connected RS.
+ */
+ // @NotNull // for the reference
private final AtomicReference<ConnectedRS> connectedRS =
new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
- /** Our replication domain. */
+ /**
+ * Our replication domain.
+ * <p>
+ * Can be null for unit test purpose.
+ */
private ReplicationDomain domain;
/**
* This object is used as a conditional event to be notified about
@@ -242,11 +256,12 @@
private int mustRunBestServerCheckingAlgorithm = 0;
/**
- * The monitor provider for this replication domain. The name of the monitor
- * includes the local address and must therefore be re-registered every time
- * the session is re-established or destroyed. The monitor provider can only
- * be created (i.e. non-null) if there is a replication domain, which is not
- * the case in unit tests.
+ * The monitor provider for this replication domain.
+ * <p>
+ * The name of the monitor includes the local address and must therefore be
+ * re-registered every time the session is re-established or destroyed. The
+ * monitor provider can only be created (i.e. non-null) if there is a
+ * replication domain, which is not the case in unit tests.
*/
private final ReplicationMonitor monitor;
@@ -268,7 +283,6 @@
this.domain = replicationDomain;
this.state = state;
this.config = config;
- this.protocolVersion = ProtocolVersion.getCurrentVersion();
this.replSessionSecurity = replSessionSecurity;
this.generationID = generationId;
this.rcvWindow = getMaxRcvWindow();
@@ -292,7 +306,7 @@
{
shutdown = false;
this.rcvWindow = getMaxRcvWindow();
- connect(connectedRS.get());
+ connect();
}
}
@@ -385,7 +399,7 @@
{
// This RS is locally configured, mark this
rsInfo.setLocallyConfigured(true);
- rsInfo.serverURL = serverUrl;
+ rsInfo.setServerURL(serverUrl);
return;
}
}
@@ -425,22 +439,16 @@
*/
public static class ReplicationServerInfo
{
+ private RSInfo rsInfo;
private short protocolVersion;
- private long generationId;
- private byte groupId = -1;
- private int serverId;
- /** Received server URL. */
- private String serverURL;
private DN baseDN;
private int windowSize;
private ServerState serverState;
private boolean sslEncryption;
- private int degradedStatusThreshold = -1;
- /** Keeps the 1 value if created with a ReplServerStartMsg. */
- private int weight = 1;
+ private final int degradedStatusThreshold;
/** Keeps the 0 value if created with a ReplServerStartMsg. */
private int connectedDSNumber = 0;
- private List<Integer> connectedDSs;
+ private Set<Integer> connectedDSs;
/**
* Is this RS locally configured? (the RS is recognized as a usable server).
*/
@@ -458,8 +466,8 @@
public static ReplicationServerInfo newInstance(
ReplicationMsg msg, String newServerURL) throws IllegalArgumentException
{
- ReplicationServerInfo rsInfo = newInstance(msg);
- rsInfo.serverURL = newServerURL;
+ final ReplicationServerInfo rsInfo = newInstance(msg);
+ rsInfo.setServerURL(newServerURL);
return rsInfo;
}
@@ -471,70 +479,62 @@
* @throws IllegalArgumentException If the passed message has an unexpected
* type.
*/
- public static ReplicationServerInfo newInstance(
- ReplicationMsg msg) throws IllegalArgumentException
+ public static ReplicationServerInfo newInstance(ReplicationMsg msg)
+ throws IllegalArgumentException
{
if (msg instanceof ReplServerStartMsg)
{
- // This is a ReplServerStartMsg (RS uses protocol V3 or under)
- ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg;
- return new ReplicationServerInfo(replServerStartMsg);
- } else if (msg instanceof ReplServerStartDSMsg)
+ // RS uses protocol V3 or lower
+ return new ReplicationServerInfo((ReplServerStartMsg) msg);
+ }
+ else if (msg instanceof ReplServerStartDSMsg)
{
- // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
- ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg;
- return new ReplicationServerInfo(replServerStartDSMsg);
+ // RS uses protocol V4 or higher
+ return new ReplicationServerInfo((ReplServerStartDSMsg) msg);
}
// Unsupported message type: should not happen
- throw new IllegalArgumentException("Unexpected PDU type: " +
- msg.getClass().getName() + " :\n" + msg);
+ throw new IllegalArgumentException("Unexpected PDU type: "
+ + msg.getClass().getName() + " :\n" + msg);
}
/**
* Constructs a ReplicationServerInfo object wrapping a
* {@link ReplServerStartMsg}.
*
- * @param replServerStartMsg
+ * @param msg
* The {@link ReplServerStartMsg} this object will wrap.
*/
- private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
+ private ReplicationServerInfo(ReplServerStartMsg msg)
{
- this.protocolVersion = replServerStartMsg.getVersion();
- this.generationId = replServerStartMsg.getGenerationId();
- this.groupId = replServerStartMsg.getGroupId();
- this.serverId = replServerStartMsg.getServerId();
- this.serverURL = replServerStartMsg.getServerURL();
- this.baseDN = replServerStartMsg.getBaseDN();
- this.windowSize = replServerStartMsg.getWindowSize();
- this.serverState = replServerStartMsg.getServerState();
- this.sslEncryption = replServerStartMsg.getSSLEncryption();
- this.degradedStatusThreshold =
- replServerStartMsg.getDegradedStatusThreshold();
+ this.protocolVersion = msg.getVersion();
+ this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
+ msg.getGenerationId(), msg.getGroupId(), 1);
+ this.baseDN = msg.getBaseDN();
+ this.windowSize = msg.getWindowSize();
+ this.serverState = msg.getServerState();
+ this.sslEncryption = msg.getSSLEncryption();
+ this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
}
/**
* Constructs a ReplicationServerInfo object wrapping a
* {@link ReplServerStartDSMsg}.
*
- * @param replServerStartDSMsg
+ * @param msg
* The {@link ReplServerStartDSMsg} this object will wrap.
*/
- private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
+ private ReplicationServerInfo(ReplServerStartDSMsg msg)
{
- this.protocolVersion = replServerStartDSMsg.getVersion();
- this.generationId = replServerStartDSMsg.getGenerationId();
- this.groupId = replServerStartDSMsg.getGroupId();
- this.serverId = replServerStartDSMsg.getServerId();
- this.serverURL = replServerStartDSMsg.getServerURL();
- this.baseDN = replServerStartDSMsg.getBaseDN();
- this.windowSize = replServerStartDSMsg.getWindowSize();
- this.serverState = replServerStartDSMsg.getServerState();
- this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
- this.degradedStatusThreshold =
- replServerStartDSMsg.getDegradedStatusThreshold();
- this.weight = replServerStartDSMsg.getWeight();
- this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
+ this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
+ msg.getGenerationId(), msg.getGroupId(), msg.getWeight());
+ this.protocolVersion = msg.getVersion();
+ this.baseDN = msg.getBaseDN();
+ this.windowSize = msg.getWindowSize();
+ this.serverState = msg.getServerState();
+ this.sslEncryption = msg.getSSLEncryption();
+ this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
+ this.connectedDSNumber = msg.getConnectedDSNumber();
}
/**
@@ -552,7 +552,7 @@
*/
public byte getGroupId()
{
- return groupId;
+ return rsInfo.getGroupId();
}
/**
@@ -570,7 +570,7 @@
*/
public long getGenerationId()
{
- return generationId;
+ return rsInfo.getGenerationId();
}
/**
@@ -579,7 +579,7 @@
*/
public int getServerId()
{
- return serverId;
+ return rsInfo.getId();
}
/**
@@ -588,7 +588,7 @@
*/
public String getServerURL()
{
- return serverURL;
+ return rsInfo.getServerUrl();
}
/**
@@ -635,7 +635,7 @@
*/
public int getWeight()
{
- return weight;
+ return rsInfo.getWeight();
}
/**
@@ -654,15 +654,13 @@
* @param rsInfo The RSinfo to use for the update
* @param connectedDSs The new connected DSs
*/
- public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs)
+ public ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs)
{
- this.serverId = rsInfo.getId();
- this.serverURL = rsInfo.getServerUrl();
- this.generationId = rsInfo.getGenerationId();
- this.groupId = rsInfo.getGroupId();
- this.weight = rsInfo.getWeight();
+ this.rsInfo = new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(),
+ rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
this.connectedDSs = connectedDSs;
this.connectedDSNumber = connectedDSs.size();
+ this.degradedStatusThreshold = -1;
this.serverState = new ServerState();
}
@@ -672,7 +670,7 @@
*/
public RSInfo toRSInfo()
{
- return new RSInfo(serverId, serverURL, generationId, groupId, weight);
+ return rsInfo;
}
/**
@@ -681,15 +679,20 @@
* @param rsInfo The RSinfo to use for the update
* @param connectedDSs The new connected DSs
*/
- public void update(RSInfo rsInfo, List<Integer> connectedDSs)
+ public void update(RSInfo rsInfo, Set<Integer> connectedDSs)
{
- this.generationId = rsInfo.getGenerationId();
- this.groupId = rsInfo.getGroupId();
- this.weight = rsInfo.getWeight();
+ this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(),
+ rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
this.connectedDSs = connectedDSs;
this.connectedDSNumber = connectedDSs.size();
}
+ private void setServerURL(String newServerURL)
+ {
+ rsInfo = new RSInfo(rsInfo.getId(), newServerURL,
+ rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
+ }
+
/**
* Updates replication server info with the passed server state.
* @param serverState The ServerState to use for the update
@@ -699,7 +702,8 @@
if (this.serverState != null)
{
this.serverState.update(serverState);
- } else
+ }
+ else
{
this.serverState = serverState;
}
@@ -709,7 +713,7 @@
* Get the getConnectedDSs.
* @return the getConnectedDSs
*/
- public List<Integer> getConnectedDSs()
+ public Set<Integer> getConnectedDSs()
{
return connectedDSs;
}
@@ -739,17 +743,17 @@
@Override
public String toString()
{
- return "Url:" + this.serverURL + " ServerId:" + this.serverId
- + " GroupId:" + this.groupId;
+ return "Url:" + getServerURL() + " ServerId:" + getServerId()
+ + " GroupId:" + getGroupId();
}
}
- private void connect(ConnectedRS rs)
+ private void connect()
{
if (getBaseDN().toNormalizedString().equalsIgnoreCase(
ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
{
- connectAsECL(rs);
+ connectAsECL();
}
else
{
@@ -770,8 +774,8 @@
for (String serverUrl : getReplicationServerUrls())
{
// Connect to server + get and store info about it
- ReplicationServerInfo rsInfo =
- performPhaseOneHandshake(serverUrl, false, false);
+ final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false);
+ final ReplicationServerInfo rsInfo = rs.rsInfo;
if (rsInfo != null)
{
rsInfos.put(rsInfo.getServerId(), rsInfo);
@@ -799,13 +803,14 @@
* </li>
* </ul>
*/
- private void connectAsECL(ConnectedRS rs)
+ private void connectAsECL()
{
// FIXME:ECL List of RS to connect is for now limited to one RS only
- final String bestServer = getReplicationServerUrls().iterator().next();
- if (performPhaseOneHandshake(bestServer, true, true) != null)
+ final String bestServerURL = getReplicationServerUrls().iterator().next();
+ final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true);
+ if (rs.isConnected())
{
- performECLPhaseTwoHandshake(bestServer, rs);
+ performECLPhaseTwoHandshake(bestServerURL, rs);
}
}
@@ -836,10 +841,6 @@
*/
private void connectAsDataServer()
{
- /*
- May have created a broker with null replication domain for
- unit test purpose.
- */
if (domain != null)
{
/*
@@ -876,22 +877,22 @@
if (replicationServerInfos.isEmpty())
{
- connectedRS.set(ConnectedRS.noConnectedRS());
+ setConnectedRS(ConnectedRS.noConnectedRS());
}
else
{
// At least one server answered, find the best one.
RSEvaluations evals = computeBestReplicationServer(true, -1, state,
replicationServerInfos, serverId, getGroupId(), getGenerationID());
- ReplicationServerInfo electedRsInfo = evals.getBestRS();
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
debugInfo("phase 2 : will perform PhaseOneH with the preferred RS="
- + electedRsInfo);
- electedRsInfo = performPhaseOneHandshake(
- electedRsInfo.getServerURL(), true, false);
+ + evals.getBestRS());
+ final ConnectedRS electedRS = performPhaseOneHandshake(
+ evals.getBestRS().getServerURL(), true, false);
+ final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
if (electedRsInfo != null)
{
/*
@@ -904,25 +905,24 @@
// Handshake phase 1 exchange went well
// Compute in which status we are starting the session to tell the RS
- ServerStatus initStatus =
- computeInitialServerStatus(electedRsInfo.getGenerationId(),
- electedRsInfo.getServerState(),
- electedRsInfo.getDegradedStatusThreshold(),
- getGenerationID());
+ final ServerStatus initStatus = computeInitialServerStatus(
+ electedRsInfo.getGenerationId(), electedRsInfo.getServerState(),
+ electedRsInfo.getDegradedStatusThreshold(), getGenerationID());
// Perform session start (handshake phase 2)
- TopologyMsg topologyMsg = performPhaseTwoHandshake(
- electedRsInfo.getServerURL(), initStatus);
+ final TopologyMsg topologyMsg =
+ performPhaseTwoHandshake(electedRS, initStatus);
if (topologyMsg != null) // Handshake phase 2 exchange went well
{
- connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
+ connectToReplicationServer(electedRS, initStatus, topologyMsg);
} // Could perform handshake phase 2 with best
} // Could perform handshake phase 1 with best
}
+ // connectedRS has been updated by calls above, reload it
final ConnectedRS rs = connectedRS.get();
- if (rs.connected)
+ if (rs.isConnected())
{
connectPhaseLock.notify();
@@ -932,13 +932,13 @@
{
logError(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
serverId, rsServerId, baseDN.toNormalizedString(),
- session.getReadableRemoteAddress(), getGenerationID()));
+ rs.replicationServer, getGenerationID()));
}
else
{
logError(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
serverId, rsServerId, baseDN.toNormalizedString(),
- session.getReadableRemoteAddress(), getGenerationID(), rsGenId));
+ rs.replicationServer, getGenerationID(), rsGenId));
}
}
else
@@ -969,29 +969,28 @@
/**
* Connects to a replication server.
*
- * @param rsInfo
+ * @param rs
* the Replication Server to connect to
* @param initStatus
* The status to enter the state machine with
* @param topologyMsg
* the message containing the topology information
*/
- private void connectToReplicationServer(ReplicationServerInfo rsInfo,
+ private void connectToReplicationServer(ConnectedRS rs,
ServerStatus initStatus, TopologyMsg topologyMsg)
{
- final int serverId = getServerId();
final DN baseDN = getBaseDN();
+ final ReplicationServerInfo rsInfo = rs.rsInfo;
- ConnectedRS rs = null;
+ boolean connectSuccessful = false;
try
{
maxSendWindow = rsInfo.getWindowSize();
- receiveTopo(topologyMsg);
+ receiveTopo(topologyMsg, rs.getServerId());
/*
- Log a message to let the administrator know that the failure
- was resolved.
+ Log a message to let the administrator know that the failure was resolved.
Wake up all the thread that were waiting on the window
on the previous connection.
*/
@@ -1018,17 +1017,11 @@
}
sendWindow = new Semaphore(maxSendWindow);
rcvWindow = getMaxRcvWindow();
- rs = new ConnectedRS(true, rsInfo, session.getReadableRemoteAddress());
- connectedRS.set(rs);
- /*
- May have created a broker with null replication domain for
- unit test purpose.
- */
if (domain != null)
{
- domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo
- .getGenerationId(), session);
+ domain.sessionInitiated(initStatus, rsInfo.getServerState(),
+ rsInfo.getGenerationId(), rs.session);
}
final byte groupId = getGroupId();
@@ -1042,28 +1035,28 @@
logError(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
Byte.toString(groupId), Integer.toString(rs.getServerId()),
rsInfo.getServerURL(), Byte.toString(rs.getGroupId()),
- baseDN.toNormalizedString(), Integer.toString(serverId)));
+ baseDN.toNormalizedString(), Integer.toString(getServerId())));
}
- startRSHeartBeatMonitoring();
+ startRSHeartBeatMonitoring(rs);
if (rsInfo.getProtocolVersion() >=
ProtocolVersion.REPLICATION_PROTOCOL_V3)
{
- startChangeTimeHeartBeatPublishing();
+ startChangeTimeHeartBeatPublishing(rs);
}
+ setConnectedRS(rs);
+ connectSuccessful = true;
}
catch (Exception e)
{
- Message message = ERR_COMPUTING_FAKE_OPS.get(
+ logError(ERR_COMPUTING_FAKE_OPS.get(
baseDN.toNormalizedString(), rsInfo.getServerURL(),
- e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
- logError(message);
+ e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)));
}
finally
{
- if (rs == null)
+ if (!connectSuccessful)
{
- connectedRS.set(ConnectedRS.noConnectedRS());
- setSession(null);
+ setConnectedRS(ConnectedRS.noConnectedRS());
}
}
}
@@ -1133,9 +1126,9 @@
* messages exchange) and return the reply message from the replication
* server, wrapped in a ReplicationServerInfo object.
*
- * @param server
+ * @param serverURL
* Server to connect to.
- * @param keepConnection
+ * @param keepSession
* Do we keep session opened or not after handshake. Use true if want
* to perform handshake phase 2 with the same session and keep the
* session to create as the current one.
@@ -1143,10 +1136,10 @@
* Indicates whether or not the an ECL handshake is to be performed.
* @return The answer from the server . Null if could not get an answer.
*/
- private ReplicationServerInfo performPhaseOneHandshake(
- String server, boolean keepConnection, boolean isECL)
+ private ConnectedRS performPhaseOneHandshake(String serverURL,
+ boolean keepSession, boolean isECL)
{
- Session localSession = null;
+ Session newSession = null;
Socket socket = null;
boolean hasConnected = false;
Message errorMessage = null;
@@ -1158,15 +1151,16 @@
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
- socket.connect(HostPort.valueOf(server).toInetSocketAddress(), timeoutMS);
- localSession = replSessionSecurity.createClientSession(socket, timeoutMS);
+ socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(),
+ timeoutMS);
+ newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
boolean isSslEncryption = replSessionSecurity.isSslEncryption();
// Send our ServerStartMsg.
final HostPort hp = new HostPort(
socket.getLocalAddress().getHostName(), socket.getLocalPort());
- String url = hp.toString();
- StartMsg serverStartMsg;
+ final String url = hp.toString();
+ final StartMsg serverStartMsg;
if (!isECL)
{
serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
@@ -1179,11 +1173,11 @@
getMaxRcvWindow(), config.getHeartbeatInterval(), state,
getGenerationID(), isSslEncryption, getGroupId());
}
- localSession.publish(serverStartMsg);
+ newSession.publish(serverStartMsg);
// Read the ReplServerStartMsg or ReplServerStartDSMsg that should
// come back.
- ReplicationMsg msg = localSession.receive();
+ ReplicationMsg msg = newSession.receive();
if (debugEnabled())
{
debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
@@ -1192,7 +1186,7 @@
// Wrap received message in a server info object
final ReplicationServerInfo replServerInfo =
- ReplicationServerInfo.newInstance(msg, server);
+ ReplicationServerInfo.newInstance(msg, serverURL);
// Sanity check
final DN repDN = replServerInfo.getBaseDN();
@@ -1200,7 +1194,7 @@
{
errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(
repDN.toNormalizedString(), getBaseDN().toNormalizedString());
- return null;
+ return setConnectedRS(ConnectedRS.noConnectedRS());
}
/*
@@ -1208,64 +1202,53 @@
* replication server will use the same one (or an older one if it is an
* old replication server).
*/
- final short localProtocolVersion = getCompatibleVersion(replServerInfo
- .getProtocolVersion());
- if (keepConnection)
- {
- protocolVersion = localProtocolVersion;
- }
- localSession.setProtocolVersion(localProtocolVersion);
+ newSession.setProtocolVersion(
+ getCompatibleVersion(replServerInfo.getProtocolVersion()));
if (!isSslEncryption)
{
- localSession.stopEncryption();
+ newSession.stopEncryption();
}
hasConnected = true;
- // If this connection is the one to use for sending and receiving
- // updates, store it.
- if (keepConnection)
+ if (keepSession)
{
- setSession(localSession);
+ // cannot store it yet,
+ // only store after a successful phase two handshake
+ return new ConnectedRS(replServerInfo, newSession);
}
-
- return replServerInfo;
+ return new ConnectedRS(replServerInfo, null);
}
catch (ConnectException e)
{
errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(),
- server, getBaseDN().toNormalizedString());
+ serverURL, getBaseDN().toNormalizedString());
}
catch (SocketTimeoutException e)
{
errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(),
- server, getBaseDN().toNormalizedString());
+ serverURL, getBaseDN().toNormalizedString());
}
catch (Exception e)
{
errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(),
- server, getBaseDN().toNormalizedString(),
+ serverURL, getBaseDN().toNormalizedString(),
stackTraceToSingleLineString(e));
}
finally
{
- if (!hasConnected || !keepConnection)
+ if (!hasConnected || !keepSession)
{
- close(localSession);
+ close(newSession);
close(socket);
}
- if (keepConnection && !hasConnected)
- {
- connectedRS.set(ConnectedRS.noConnectedRS());
- }
-
if (!hasConnected && errorMessage != null && !connectionError)
{
// There was no server waiting on this host:port
// Log a notice and will try the next replicationServer in the list
- if (keepConnection) // Log error message only for final connection
+ if (keepSession) // Log error message only for final connection
{
// log the error message only once to avoid overflowing the error log
logError(errorMessage);
@@ -1277,7 +1260,7 @@
}
}
}
- return null;
+ return setConnectedRS(ConnectedRS.noConnectedRS());
}
@@ -1294,10 +1277,9 @@
try
{
// Send our Start Session
- StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
+ final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
startECLSessionMsg.setOperationId("-1");
- final Session localSession = session;
- localSession.publish(startECLSessionMsg);
+ rs.session.publish(startECLSessionMsg);
// FIXME ECL In the handshake phase two, should RS send back a topo msg ?
if (debugEnabled())
@@ -1306,18 +1288,17 @@
}
// Alright set the timeout to the desired value
- localSession.setSoTimeout(timeout);
- connectedRS.set(rs.setConnected());
+ rs.session.setSoTimeout(timeout);
+ setConnectedRS(rs);
}
catch (Exception e)
{
- Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
+ logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
getServerId(), server, getBaseDN().toNormalizedString(),
- stackTraceToSingleLineString(e));
- logError(message);
+ stackTraceToSingleLineString(e)));
- connectedRS.set(ConnectedRS.noConnectedRS());
- setSession(null);
+ rs.session.close();
+ setConnectedRS(ConnectedRS.noConnectedRS());
}
}
@@ -1326,22 +1307,18 @@
* TopologyMsg messages exchange) and return the reply message from the
* replication server.
*
- * @param server Server we are connecting with.
+ * @param electedRS Server we are connecting with.
* @param initStatus The status we are starting with
* @return The ReplServerStartMsg the server replied. Null if could not
* get an answer.
*/
- private TopologyMsg performPhaseTwoHandshake(String server,
+ private TopologyMsg performPhaseTwoHandshake(ConnectedRS electedRS,
ServerStatus initStatus)
{
try
{
- /*
- * Send our StartSessionMsg.
- */
- StartSessionMsg startSessionMsg;
- // May have created a broker with null replication domain for
- // unit test purpose.
+ // Send our StartSessionMsg.
+ final StartSessionMsg startSessionMsg;
if (domain != null)
{
startSessionMsg = new StartSessionMsg(
@@ -1359,11 +1336,11 @@
startSessionMsg =
new StartSessionMsg(initStatus, new ArrayList<String>());
}
- final Session localSession = session;
- localSession.publish(startSessionMsg);
+ final Session session = electedRS.session;
+ session.publish(startSessionMsg);
// Read the TopologyMsg that should come back.
- final TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
+ final TopologyMsg topologyMsg = (TopologyMsg) session.receive();
if (debugEnabled())
{
@@ -1372,20 +1349,16 @@
}
// Alright set the timeout to the desired value
- localSession.setSoTimeout(timeout);
+ session.setSoTimeout(timeout);
return topologyMsg;
}
catch (Exception e)
{
- Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
- getServerId(), server, getBaseDN().toNormalizedString(),
- stackTraceToSingleLineString(e));
- logError(message);
+ logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
+ getServerId(), electedRS.rsInfo.getServerURL(),
+ getBaseDN().toNormalizedString(), stackTraceToSingleLineString(e)));
- connectedRS.set(ConnectedRS.noConnectedRS());
- setSession(null);
-
- // Be sure to return null.
+ setConnectedRS(ConnectedRS.noConnectedRS());
return null;
}
}
@@ -2272,14 +2245,13 @@
/**
* Start the heartbeat monitor thread.
*/
- private void startRSHeartBeatMonitoring()
+ private void startRSHeartBeatMonitoring(ConnectedRS rs)
{
- // Start a heartbeat monitor thread.
final long heartbeatInterval = config.getHeartbeatInterval();
if (heartbeatInterval > 0)
{
- heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(),
- getBaseDN().toNormalizedString(), session, heartbeatInterval);
+ heartbeatMonitor = new HeartbeatMonitor(getServerId(), rs.getServerId(),
+ getBaseDN().toNormalizedString(), rs.session, heartbeatInterval);
heartbeatMonitor.start();
}
}
@@ -2302,7 +2274,7 @@
*/
public void reStart(boolean infiniteTry)
{
- reStart(session, infiniteTry);
+ reStart(connectedRS.get().session, infiniteTry);
}
/**
@@ -2319,14 +2291,10 @@
numLostConnections++;
}
- ConnectedRS rs;
- if (failingSession == session)
+ ConnectedRS rs = connectedRS.get();
+ if (failingSession == rs.session && !rs.equals(ConnectedRS.noConnectedRS()))
{
- rs = ConnectedRS.noConnectedRS();
- connectedRS.set(rs);
- setSession(null);
- } else {
- rs = connectedRS.get();
+ rs = setConnectedRS(ConnectedRS.noConnectedRS());
}
while (true)
@@ -2334,14 +2302,14 @@
// Synchronize inside the loop in order to allow shutdown.
synchronized (startStopLock)
{
- if (rs.connected || shutdown)
+ if (rs.isConnected() || shutdown)
{
break;
}
try
{
- connect(rs);
+ connect();
rs = connectedRS.get();
}
catch (Exception e)
@@ -2353,7 +2321,7 @@
logError(mb.toMessage());
}
- if (rs.connected || !infiniteTry)
+ if (rs.isConnected() || !infiniteTry)
{
break;
}
@@ -2362,7 +2330,7 @@
{
Thread.sleep(500);
}
- catch (InterruptedException e)
+ catch (InterruptedException ignored)
{
// ignore
}
@@ -2370,7 +2338,7 @@
if (debugEnabled())
{
- debugInfo("end restart : connected=" + rs.connected + " with RS("
+ debugInfo("end restart : connected=" + rs.isConnected() + " with RS("
+ rs.getServerId() + ") genId=" + generationID);
}
}
@@ -2451,7 +2419,7 @@
Semaphore currentWindowSemaphore;
synchronized (connectPhaseLock)
{
- currentSession = session;
+ currentSession = connectedRS.get().session;
currentWindowSemaphore = sendWindow;
}
@@ -2491,10 +2459,10 @@
Check the session. If it has changed, some disconnection or
reconnection happened and we need to restart from scratch.
*/
- final Session localSession = session;
- if (localSession != null && session == currentSession)
+ final Session session = connectedRS.get().session;
+ if (session != null && session == currentSession)
{
- localSession.publish(msg);
+ session.publish(msg);
done = true;
}
}
@@ -2509,17 +2477,20 @@
window update message was lost somehow...
then loop to check again if connection was closed.
*/
- Session localSession = session;
- if (localSession != null)
+ Session session = connectedRS.get().session;
+ if (session != null)
{
- localSession.publish(new WindowProbeMsg());
+ session.publish(new WindowProbeMsg());
}
}
}
- } catch (IOException e)
+ }
+ catch (IOException e)
{
if (!retryOnFailure)
+ {
return false;
+ }
// The receive threads should handle reconnection or
// mark this broker in error. Just retry.
@@ -2590,17 +2561,17 @@
{
while (!shutdown)
{
- final ConnectedRS rs = connectedRS.get();
- if (reconnectOnFailure && !rs.connected)
+ ConnectedRS rs = connectedRS.get();
+ if (reconnectOnFailure && !rs.isConnected())
{
// infinite try to reconnect
reStart(null, true);
+ continue;
}
// Save session information for later in case we need it for log messages
// after the session has been closed and/or failed.
- final Session localSession = session;
- if (localSession == null)
+ if (rs.session == null)
{
// Must be shutting down.
break;
@@ -2611,7 +2582,7 @@
final int previousRsServerID = rs.getServerId();
try
{
- ReplicationMsg msg = localSession.receive();
+ ReplicationMsg msg = rs.session.receive();
if (msg instanceof UpdateMsg)
{
synchronized (this)
@@ -2621,13 +2592,13 @@
}
if (msg instanceof WindowMsg)
{
- WindowMsg windowMsg = (WindowMsg) msg;
+ final WindowMsg windowMsg = (WindowMsg) msg;
sendWindow.release(windowMsg.getNumAck());
}
else if (msg instanceof TopologyMsg)
{
- TopologyMsg topoMsg = (TopologyMsg) msg;
- receiveTopo(topoMsg);
+ final TopologyMsg topoMsg = (TopologyMsg) msg;
+ receiveTopo(topoMsg, getRsServerId());
if (reconnectToTheBestRS)
{
// Reset wait time before next computation of best server
@@ -2636,19 +2607,20 @@
// Caller wants to check what's changed
if (returnOnTopoChange)
+ {
return msg;
-
+ }
}
else if (msg instanceof StopMsg)
{
// RS performs a proper disconnection
Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
- previousRsServerID, localSession.getReadableRemoteAddress(),
+ previousRsServerID, rs.replicationServer,
serverId, baseDN.toNormalizedString());
logError(message);
// Try to find a suitable RS
- reStart(localSession, true);
+ reStart(rs.session, true);
}
else if (msg instanceof MonitorMsg)
{
@@ -2709,16 +2681,14 @@
if (bestServerInfo == null)
{
message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
- serverId, previousRsServerID,
- localSession.getReadableRemoteAddress(),
+ serverId, previousRsServerID, rs.replicationServer,
baseDN.toNormalizedString());
}
else
{
final int bestRsServerId = bestServerInfo.getServerId();
message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
- serverId, previousRsServerID,
- localSession.getReadableRemoteAddress(),
+ serverId, previousRsServerID, rs.replicationServer,
bestRsServerId,
baseDN.toNormalizedString(),
evals.getEvaluation(previousRsServerID).toString(),
@@ -2754,24 +2724,20 @@
if (!shutdown)
{
- final Session tmpSession = session;
- if (tmpSession == null || !tmpSession.closeInitiated())
+ if (rs.session == null || !rs.session.closeInitiated())
{
// We did not initiate the close on our side, log an error message.
- Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
+ logError(WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
serverId, baseDN.toNormalizedString(), previousRsServerID,
- localSession.getReadableRemoteAddress());
- logError(message);
+ rs.replicationServer));
}
- if (reconnectOnFailure)
- {
- reStart(localSession, true);
- }
- else
+ if (!reconnectOnFailure)
{
break; // does not seem necessary to explicitly disconnect ..
}
+
+ reStart(rs.session, true);
}
}
} // while !shutdown
@@ -2824,10 +2790,10 @@
try
{
updateDoneCount++;
- final Session localSession = session;
- if (updateDoneCount >= halfRcvWindow && localSession != null)
+ final Session session = connectedRS.get().session;
+ if (updateDoneCount >= halfRcvWindow && session != null)
{
- localSession.publish(new WindowMsg(updateDoneCount));
+ session.publish(new WindowMsg(updateDoneCount));
rcvWindow += updateDoneCount;
updateDoneCount = 0;
}
@@ -2850,10 +2816,9 @@
synchronized (startStopLock)
{
shutdown = true;
+ setConnectedRS(ConnectedRS.stopped());
stopRSHeartBeatMonitoring();
stopChangeTimeHeartBeatPublishing();
- connectedRS.set(ConnectedRS.stopped());
- setSession(null);
deregisterReplicationMonitor();
}
}
@@ -2872,10 +2837,10 @@
public void setSoTimeout(int timeout) throws SocketException
{
this.timeout = timeout;
- final Session localSession = session;
- if (localSession != null)
+ final Session session = connectedRS.get().session;
+ if (session != null)
{
- localSession.setSoTimeout(timeout);
+ session.setSoTimeout(timeout);
}
}
@@ -2977,7 +2942,12 @@
*/
public short getProtocolVersion()
{
- return protocolVersion;
+ final Session session = connectedRS.get().session;
+ if (session != null)
+ {
+ return session.getProtocolVersion();
+ }
+ return ProtocolVersion.getCurrentVersion();
}
/**
@@ -2988,7 +2958,7 @@
*/
public boolean isConnected()
{
- return connectedRS.get().connected;
+ return connectedRS.get().isConnected();
}
/**
@@ -2997,8 +2967,8 @@
*/
public boolean isSessionEncrypted()
{
- final Session tmp = session;
- return tmp != null ? tmp.isEncrypted() : false;
+ final Session session = connectedRS.get().session;
+ return session != null ? session.isEncrypted() : false;
}
/**
@@ -3009,9 +2979,8 @@
{
try
{
- ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS,
- newStatus);
- session.publish(csMsg);
+ connectedRS.get().session.publish(
+ new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus));
} catch (IOException ex)
{
Message message = ERR_EXCEPTION_SENDING_CS.get(
@@ -3051,13 +3020,14 @@
* Computes the list of DSs connected to a particular RS.
* @param rsId The RS id of the server one wants to know the connected DSs
* @param dsList The list of DSinfo from which to compute things
+ * @param rsServerId the serverId to use for the connectedDS
* @return The list of connected DSs to the server rsId
*/
- private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList)
+ private Set<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList,
+ int rsServerId)
{
- List<Integer> connectedDSs = new ArrayList<Integer>();
-
- if (getRsServerId() == rsId)
+ final Set<Integer> connectedDSs = new HashSet<Integer>();
+ if (rsServerId == rsId)
{
/*
If we are computing connected DSs for the RS we are connected
@@ -3071,7 +3041,9 @@
for (DSInfo dsInfo : dsList)
{
if (dsInfo.getRsId() == rsId)
+ {
connectedDSs.add(dsInfo.getDsId());
+ }
}
return connectedDSs;
@@ -3081,9 +3053,12 @@
* Processes an incoming TopologyMsg.
* Updates the structures for the local view of the topology.
*
- * @param topoMsg The topology information received from RS.
+ * @param topoMsg
+ * The topology information received from RS.
+ * @param rsServerId
+ * the serverId to use for the connectedDS
*/
- public void receiveTopo(TopologyMsg topoMsg)
+ private void receiveTopo(TopologyMsg topoMsg, int rsServerId)
{
if (debugEnabled())
debugInfo("receive TopologyMsg=" + topoMsg);
@@ -3096,9 +3071,9 @@
final Set<Integer> rssToKeep = new HashSet<Integer>();
for (RSInfo rsInfo : topoMsg.getRsList())
{
- int rsId = rsInfo.getId();
+ final int rsId = rsInfo.getId();
rssToKeep.add(rsId); // Mark this server as still existing
- final List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
+ Set<Integer> connectedDSs = computeConnectedDSs(rsId, dsList, rsServerId);
ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
if (rsInfo2 == null)
{
@@ -3106,7 +3081,8 @@
rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
setLocallyConfiguredFlag(rsInfo2);
replicationServerInfos.put(rsId, rsInfo2);
- } else
+ }
+ else
{
// Update the existing info for the replication server
rsInfo2.update(rsInfo, connectedDSs);
@@ -3140,20 +3116,18 @@
/**
* Starts publishing to the RS the current timestamp used in this server.
*/
- private void startChangeTimeHeartBeatPublishing()
+ private void startChangeTimeHeartBeatPublishing(ConnectedRS rs)
{
// Start a CSN heartbeat thread.
long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval();
if (changeTimeHeartbeatInterval > 0)
{
- final Session localSession = session;
final String threadName = "Replica DS(" + getServerId()
- + ") change time heartbeat publisher for domain \""
- + getBaseDN() + "\" to RS(" + getRsServerId()
- + ") at " + localSession.getReadableRemoteAddress();
+ + ") change time heartbeat publisher for domain \"" + getBaseDN()
+ + "\" to RS(" + rs.getServerId() + ") at " + rs.replicationServer;
ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
- threadName, localSession, changeTimeHeartbeatInterval, getServerId());
+ threadName, rs.session, changeTimeHeartbeatInterval, getServerId());
ctHeartbeatPublisherThread.start();
}
else
@@ -3206,8 +3180,8 @@
*/
String getLocalUrl()
{
- final Session tmp = session;
- return tmp != null ? tmp.getLocalUrl() : "";
+ final Session session = connectedRS.get().session;
+ return session != null ? session.getLocalUrl() : "";
}
/**
@@ -3221,28 +3195,30 @@
return monitor.getMonitorInstanceName();
}
- private void setSession(final Session newSession)
+ private ConnectedRS setConnectedRS(final ConnectedRS newRS)
{
- // De-register the monitor with the old name.
- deregisterReplicationMonitor();
-
- final Session oldSession = session;
- if (oldSession != null)
+ final ConnectedRS oldRS = connectedRS.getAndSet(newRS);
+ if (!oldRS.equals(newRS) && oldRS.session != null)
{
- oldSession.close();
+ // monitor name is changing, deregister before registering again
+ deregisterReplicationMonitor();
+ oldRS.session.close();
+ registerReplicationMonitor();
}
- session = newSession;
-
- // Re-register the monitor with the new name.
- registerReplicationMonitor();
+ return newRS;
}
+ /**
+ * Must be invoked each time the session changes because, the monitor name is
+ * dynamically created with the session name, while monitor registration is
+ * static.
+ *
+ * @see #monitor
+ */
private void registerReplicationMonitor()
{
- /*
- * The monitor should not be registered if this is a unit test because the
- * replication domain is null.
- */
+ // The monitor should not be registered if this is a unit test
+ // because the replication domain is null.
if (monitor != null)
{
DirectoryServer.registerMonitorProvider(monitor);
@@ -3251,10 +3227,8 @@
private void deregisterReplicationMonitor()
{
- /*
- * The monitor should not be deregistered if this is a unit test because the
- * replication domain is null.
- */
+ // The monitor should not be deregistered if this is a unit test
+ // because the replication domain is null.
if (monitor != null)
{
DirectoryServer.deregisterMonitorProvider(monitor);
--
Gitblit v1.10.0