From 305d344b81f1fd8eb96c6c938ae0be0c268f45af Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 08 Oct 2009 16:02:17 +0000
Subject: [PATCH] - Addition of ReplServerStartDSMsg now sent to a DS connecting to a RS in handshake phase instead of a ReplServerStartMsg. ReplServerStartDSMsg contains same thing as ReplServerStartMsg but also contains
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 467 ++++++++++++++++++++++++++++++++++++++++++++-------------
1 files changed, 358 insertions(+), 109 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 9b39dc7..e058301 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -61,6 +62,7 @@
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -68,6 +70,7 @@
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
@@ -112,9 +115,6 @@
// Our replication domain
private ReplicationDomain domain = null;
- // Trick for avoiding a inner class for many parameters return for
- // performPhaseOneHandshake method.
- private String tmpReadableServerName = null;
/**
* The expected duration in milliseconds between heartbeats received
* from the replication server. Zero means heartbeats are off.
@@ -183,7 +183,7 @@
* @param groupId The group id of our domain.
* @param changeTimeHeartbeatInterval The interval (in ms) between Change
* time heartbeats are sent to the RS,
- * or zero if no CN heartbeat shoud be sent.
+ * or zero if no CN heartbeat should be sent.
*/
public ReplicationBroker(ReplicationDomain replicationDomain,
ServerState state, String baseDn, int serverID2, int window,
@@ -290,23 +290,93 @@
/**
* Bag class for keeping info we get from a server in order to compute the
- * best one to connect to.
+ * best one to connect to. This is in fact a wrapper to a
+ * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
*/
public static class ServerInfo
{
-
- private ServerState serverState = null;
+ private short protocolVersion;
+ private long generationId;
private byte groupId = (byte) -1;
+ private int serverId;
+ private String serverURL;
+ private String baseDn = null;
+ private int windowSize;
+ private ServerState serverState;
+ private boolean sslEncryption;
+ private int degradedStatusThreshold = -1;
+ // Keeps the -1 value if created with a ReplServerStartMsg
+ private int weight = -1;
+ // Keeps the -1 value if created with a ReplServerStartMsg
+ private int connectedDSNumber = -1;
/**
- * Constructor.
- * @param serverState Server state of the RS
- * @param groupId Group id of the RS
+ * Create a new instance of ServerInfo wrapping the passed message.
+ * @param msg Message to wrap.
+ * @return The new instance wrapping the passed message.
+ * @throws IllegalArgumentException If the passed message has an unexpected
+ * type.
*/
- public ServerInfo(ServerState serverState, byte groupId)
+ public static ServerInfo newServerInfo(
+ ReplicationMsg msg) throws IllegalArgumentException
{
- this.serverState = serverState;
- this.groupId = groupId;
+ if (msg instanceof ReplServerStartMsg)
+ {
+ // This is a ReplServerStartMsg (RS uses protocol V3 or under)
+ ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
+ return new ServerInfo(replServerStartMsg);
+ }
+ else if (msg instanceof ReplServerStartDSMsg)
+ {
+ // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
+ ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
+ return new ServerInfo(replServerStartDSMsg);
+ }
+
+ // Unsupported message type: should not happen
+ throw new IllegalArgumentException("Unexpected PDU type: " +
+ msg.getClass().getName() + " :\n" + msg.toString());
+ }
+
+ /**
+ * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
+ * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
+ */
+ private ServerInfo(ReplServerStartMsg replServerStartMsg)
+ {
+ this.protocolVersion = replServerStartMsg.getVersion();
+ this.generationId = replServerStartMsg.getGenerationId();
+ this.groupId = replServerStartMsg.getGroupId();
+ this.serverId = replServerStartMsg.getServerId();
+ this.serverURL = replServerStartMsg.getServerURL();
+ this.baseDn = replServerStartMsg.getBaseDn();
+ this.windowSize = replServerStartMsg.getWindowSize();
+ this.serverState = replServerStartMsg.getServerState();
+ this.sslEncryption = replServerStartMsg.getSSLEncryption();
+ this.degradedStatusThreshold =
+ replServerStartMsg.getDegradedStatusThreshold();
+ }
+
+ /**
+ * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
+ * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
+ * wrap.
+ */
+ private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
+ {
+ this.protocolVersion = replServerStartDSMsg.getVersion();
+ this.generationId = replServerStartDSMsg.getGenerationId();
+ this.groupId = replServerStartDSMsg.getGroupId();
+ this.serverId = replServerStartDSMsg.getServerId();
+ this.serverURL = replServerStartDSMsg.getServerURL();
+ this.baseDn = replServerStartDSMsg.getBaseDn();
+ this.windowSize = replServerStartDSMsg.getWindowSize();
+ this.serverState = replServerStartDSMsg.getServerState();
+ this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
+ this.degradedStatusThreshold =
+ replServerStartDSMsg.getDegradedStatusThreshold();
+ this.weight = replServerStartDSMsg.getWeight();
+ this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
}
/**
@@ -326,6 +396,98 @@
{
return groupId;
}
+
+ /**
+ * Get the server protocol version.
+ * @return the protocolVersion
+ */
+ public short getProtocolVersion()
+ {
+ return protocolVersion;
+ }
+
+ /**
+ * Get the generation id.
+ * @return the generationId
+ */
+ public long getGenerationId()
+ {
+ return generationId;
+ }
+
+ /**
+ * Get the server id.
+ * @return the serverId
+ */
+ public int getServerId()
+ {
+ return serverId;
+ }
+
+ /**
+ * Get the server URL.
+ * @return the serverURL
+ */
+ public String getServerURL()
+ {
+ return serverURL;
+ }
+
+ /**
+ * Get the base dn.
+ * @return the baseDn
+ */
+ public String getBaseDn()
+ {
+ return baseDn;
+ }
+
+ /**
+ * Get the window size.
+ * @return the windowSize
+ */
+ public int getWindowSize()
+ {
+ return windowSize;
+ }
+
+ /**
+ * Get the ssl encryption.
+ * @return the sslEncryption
+ */
+ public boolean isSslEncryption()
+ {
+ return sslEncryption;
+ }
+
+ /**
+ * Get the degraded status threshold.
+ * @return the degradedStatusThreshold
+ */
+ public int getDegradedStatusThreshold()
+ {
+ return degradedStatusThreshold;
+ }
+
+ /**
+ * Get the weight.
+ * @return the weight. Null if this object is a wrapper for
+ * a ReplServerStartMsg.
+ */
+ public int getWeight()
+ {
+ return weight;
+ }
+
+ /**
+ * Get the connected DS number.
+ * @return the connectedDSNumber. Null if this object is a wrapper for
+ * a ReplServerStartMsg.
+ */
+ public int getConnectedDSNumber()
+ {
+ return connectedDSNumber;
+ }
}
private void connect()
@@ -342,10 +504,34 @@
}
/**
+ * Contacts all replication servers to get information from them and being
+ * able to choose the more suitable.
+ * @return the collected information.
+ */
+ private Map<String, ServerInfo> collectReplicationServersInfo() {
+
+ Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+
+ for (String server : servers)
+ {
+ // Connect to server and get info about it
+ ServerInfo serverInfo = performPhaseOneHandshake(server, false);
+
+ // Store server info in list
+ if (serverInfo != null)
+ {
+ rsInfos.put(server, serverInfo);
+ }
+ }
+
+ return rsInfos;
+ }
+
+ /**
* Special aspects of connecting as ECL compared to connecting as data server
* are :
* - 1 single RS configured
- * - so no choice of the prefered RS
+ * - so no choice of the preferred RS
* - No same groupID polling
* - ?? Heartbeat
* - Start handshake is :
@@ -358,10 +544,10 @@
// FIXME:ECL List of RS to connect is for now limited to one RS only
String bestServer = this.servers.iterator().next();
- ReplServerStartMsg inReplServerStartMsg
+ ReplServerStartDSMsg inReplServerStartDSMsg
= performECLPhaseOneHandshake(bestServer, true);
- if (inReplServerStartMsg!=null)
+ if (inReplServerStartDSMsg!=null)
performECLPhaseTwoHandshake(bestServer);
}
@@ -392,8 +578,6 @@
*/
private void connectAsDataServer()
{
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
-
// May have created a broker with null replication domain for
// unit test purpose.
if (domain != null)
@@ -418,24 +602,12 @@
*/
if (debugEnabled())
TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
- " order to elect the prefered one");
- for (String server : servers)
- {
- // Connect to server and get reply message
- ReplServerStartMsg replServerStartMsg =
- performPhaseOneHandshake(server, false);
+ " order to elect the preferred one");
- // Store reply message info in list
- if (replServerStartMsg != null)
- {
- ServerInfo serverInfo =
- new ServerInfo(replServerStartMsg.getServerState(),
- replServerStartMsg.getGroupId());
- rsInfos.put(server, serverInfo);
- }
- } // for servers
+ // Get info from every available replication servers
+ Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
- ReplServerStartMsg replServerStartMsg = null;
+ ServerInfo serverInfo = null;
if (rsInfos.size() > 0)
{
@@ -446,19 +618,17 @@
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
TRACER.debugInfo(
- "phase 2 : will perform PhaseOneH with the prefered RS.");
- replServerStartMsg = performPhaseOneHandshake(bestServer, true);
+ "phase 2 : will perform PhaseOneH with the preferred RS.");
+ serverInfo = performPhaseOneHandshake(bestServer, true);
- if (replServerStartMsg != null) // Handshake phase 1 exchange went well
+ if (serverInfo != null) // Handshake phase 1 exchange went well
{
- ServerInfo bestServerInfo = rsInfos.get(bestServer);
-
// Compute in which status we are starting the session to tell the RS
ServerStatus initStatus =
- computeInitialServerStatus(replServerStartMsg.getGenerationId(),
- bestServerInfo.getServerState(),
- replServerStartMsg.getDegradedStatusThreshold(),
+ computeInitialServerStatus(serverInfo.getGenerationId(),
+ serverInfo.getServerState(),
+ serverInfo.getDegradedStatusThreshold(),
this.getGenerationID());
// Perfom session start (handshake phase 2)
@@ -485,7 +655,7 @@
* reconnection at that time to retrieve a server with our group
* id.
*/
- byte tmpRsGroupId = bestServerInfo.getGroupId();
+ byte tmpRsGroupId = serverInfo.getGroupId();
boolean someServersWithSameGroupId =
hasSomeServerWithSameGroupId(topologyMsg.getRsList());
@@ -493,10 +663,10 @@
if ((tmpRsGroupId == groupId) ||
((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
{
- replicationServer = tmpReadableServerName;
- maxSendWindow = replServerStartMsg.getWindowSize();
- rsGroupId = replServerStartMsg.getGroupId();
- rsServerId = replServerStartMsg.getServerId();
+ replicationServer = session.getReadableRemoteAddress();
+ maxSendWindow = serverInfo.getWindowSize();
+ rsGroupId = serverInfo.getGroupId();
+ rsServerId = serverInfo.getServerId();
rsServerUrl = bestServer;
// May have created a broker with null replication domain for
@@ -504,8 +674,8 @@
if (domain != null)
{
domain.sessionInitiated(
- initStatus, replServerStartMsg.getServerState(),
- replServerStartMsg.getGenerationId(),
+ initStatus, serverInfo.getServerState(),
+ serverInfo.getGenerationId(),
session);
}
receiveTopo(topologyMsg);
@@ -524,7 +694,7 @@
startSameGroupIdPoller();
}
startRSHeartBeatMonitoring();
- if (replServerStartMsg.getVersion()
+ if (serverInfo.getProtocolVersion()
>= ProtocolVersion.REPLICATION_PROTOCOL_V3)
{
startChangeTimeHeartBeatPublishing();
@@ -584,8 +754,8 @@
rcvWindow = maxRcvWindow;
connectPhaseLock.notify();
- if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) ||
- (replServerStartMsg.getGenerationId() == -1))
+ if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
+ (serverInfo.getGenerationId() == -1))
{
Message message =
NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -602,7 +772,7 @@
baseDn.toString(),
replicationServer,
Long.toString(this.getGenerationID()),
- Long.toString(replServerStartMsg.getGenerationId()));
+ Long.toString(serverInfo.getGenerationId()));
logError(message);
}
} else
@@ -709,19 +879,19 @@
/**
* Connect to the provided server performing the first phase handshake
* (start messages exchange) and return the reply message from the replication
- * server.
+ * server, wrapped in a ServerInfo object.
*
* @param server Server to connect to.
* @param keepConnection Do we keep session opened or not after handshake.
* Use true if want to perform handshake phase 2 with the same session
* and keep the session to create as the current one.
- * @return The ReplServerStartMsg the server replied. Null if could not
+ * @return The answer from the server . Null if could not
* get an answer.
*/
- private ReplServerStartMsg performPhaseOneHandshake(String server,
+ private ServerInfo performPhaseOneHandshake(String server,
boolean keepConnection)
{
- ReplServerStartMsg replServerStartMsg = null;
+ ServerInfo serverInfo = null;
// Parse server string.
int separator = server.lastIndexOf(':');
@@ -738,8 +908,6 @@
int intPort = Integer.parseInt(port);
InetSocketAddress serverAddr = new InetSocketAddress(
InetAddress.getByName(hostname), intPort);
- if (keepConnection)
- tmpReadableServerName = serverAddr.toString();
Socket socket = new Socket();
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
@@ -759,19 +927,23 @@
localSession.publish(serverStartMsg);
/*
- * Read the ReplServerStartMsg that should come back.
+ * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come
+ * back.
*/
- replServerStartMsg = (ReplServerStartMsg) localSession.receive();
+ ReplicationMsg msg = localSession.receive();
if (debugEnabled())
- {
- TRACER.debugInfo("In RB for " + baseDn +
- "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
- "\nAND RECEIVED:\n" + replServerStartMsg.toString());
- }
+ {
+ TRACER.debugInfo("In RB for " + baseDn +
+ "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
+ "\nAND RECEIVED:\n" + msg.toString());
+ }
+
+ // Wrap received message in a server info object
+ serverInfo = ServerInfo.newServerInfo(msg);
// Sanity check
- String repDn = replServerStartMsg.getBaseDn();
+ String repDn = serverInfo.getBaseDn();
if (!(this.baseDn.equals(repDn)))
{
Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -786,7 +958,7 @@
* if it is an old replication server).
*/
protocolVersion = ProtocolVersion.minWithCurrent(
- replServerStartMsg.getVersion());
+ serverInfo.getProtocolVersion());
localSession.setProtocolVersion(protocolVersion);
@@ -839,10 +1011,25 @@
{
if (localSession != null)
{
+ if (debugEnabled())
+ TRACER.debugInfo("In RB, closing session after phase 1");
+
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end communications
+ if (!error)
+ {
+ try
+ {
+ localSession.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
+ }
try
{
- if (debugEnabled())
- TRACER.debugInfo("In RB, closing session after phase 1");
localSession.close();
} catch (IOException e)
{
@@ -852,7 +1039,7 @@
}
if (error)
{
- replServerStartMsg = null;
+ serverInfo = null;
} // Be sure to return null.
}
@@ -864,7 +1051,7 @@
session = localSession;
}
- return replServerStartMsg;
+ return serverInfo;
}
/**
@@ -876,13 +1063,13 @@
* @param keepConnection Do we keep session opened or not after handshake.
* Use true if want to perform handshake phase 2 with the same session
* and keep the session to create as the current one.
- * @return The ReplServerStartMsg the server replied. Null if could not
+ * @return The ReplServerStartDSMsg the server replied. Null if could not
* get an answer.
*/
- private ReplServerStartMsg performECLPhaseOneHandshake(String server,
+ private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
boolean keepConnection)
{
- ReplServerStartMsg replServerStartMsg = null;
+ ReplServerStartDSMsg replServerStartDSMsg = null;
// Parse server string.
int separator = server.lastIndexOf(':');
@@ -899,8 +1086,6 @@
int intPort = Integer.parseInt(port);
InetSocketAddress serverAddr = new InetSocketAddress(
InetAddress.getByName(hostname), intPort);
- if (keepConnection)
- tmpReadableServerName = serverAddr.toString();
Socket socket = new Socket();
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
@@ -920,17 +1105,17 @@
localSession.publish(serverStartECLMsg);
// Read the ReplServerStartMsg that should come back.
- replServerStartMsg = (ReplServerStartMsg) localSession.receive();
+ replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive();
if (debugEnabled())
{
TRACER.debugInfo("In RB for " + baseDn +
"\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
- "\nAND RECEIVED:\n" + replServerStartMsg.toString());
+ "\nAND RECEIVED:\n" + replServerStartDSMsg.toString());
}
// Sanity check
- String repDn = replServerStartMsg.getBaseDn();
+ String repDn = replServerStartDSMsg.getBaseDn();
if (!(this.baseDn.equals(repDn)))
{
Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -946,7 +1131,7 @@
*/
if (keepConnection)
protocolVersion = ProtocolVersion.minWithCurrent(
- replServerStartMsg.getVersion());
+ replServerStartDSMsg.getVersion());
localSession.setProtocolVersion(protocolVersion);
if (!isSslEncryption)
@@ -998,10 +1183,22 @@
{
if (localSession != null)
{
+ if (debugEnabled())
+ TRACER.debugInfo("In RB, closing session after phase 1");
+
+ // V4 protocol introduces a StopMsg to properly end communications
+ if (!error)
+ {
+ try
+ {
+ localSession.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
try
{
- if (debugEnabled())
- TRACER.debugInfo("In RB, closing session after phase 1");
localSession.close();
} catch (IOException e)
{
@@ -1011,7 +1208,7 @@
}
if (error)
{
- replServerStartMsg = null;
+ replServerStartDSMsg = null;
} // Be sure to return null.
}
@@ -1023,7 +1220,7 @@
session = localSession;
}
- return replServerStartMsg;
+ return replServerStartDSMsg;
}
/**
@@ -1184,8 +1381,7 @@
* @return The computed best replication server.
*/
public static String computeBestReplicationServer(ServerState myState,
- HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn,
- byte groupId)
+ Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId)
{
/*
* Preference is given to servers with the requested group id:
@@ -1195,7 +1391,7 @@
*/
// Filter for servers with same group id
- HashMap<String, ServerInfo> sameGroupIdRsInfos =
+ Map<String, ServerInfo> sameGroupIdRsInfos =
new HashMap<String, ServerInfo>();
for (String repServer : rsInfos.keySet())
@@ -1231,7 +1427,7 @@
* @return The computed best replication server.
*/
private static String searchForBestReplicationServer(ServerState myState,
- HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn)
+ Map<String, ServerInfo> rsInfos, int serverId2, String baseDn)
{
/*
* Find replication servers who are up to date (or more up to date than us,
@@ -1266,7 +1462,7 @@
HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
/*
- * Start loop to differenciate up to date servers from late ones.
+ * Start loop to differentiate up to date servers from late ones.
*/
ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2);
if (myChangeNumber == null)
@@ -1321,6 +1517,7 @@
if (ReplicationServer.isLocalReplicationServer(upServer))
{
localRS = true;
+ break;
}
}
if (localRS)
@@ -1459,7 +1656,8 @@
new HeartbeatMonitor("Replication Heartbeat Monitor on RS " +
getReplicationServer() + " " + rsServerId + " for " + baseDn +
" in DS " + serverId,
- session, heartbeatInterval);
+ session, heartbeatInterval, (protocolVersion >=
+ ProtocolVersion.REPLICATION_PROTOCOL_V4));
heartbeatMonitor.start();
}
}
@@ -1513,16 +1711,28 @@
*/
public void reStart(ProtocolSession failingSession)
{
- try
+
+ if (failingSession != null)
{
- if (failingSession != null)
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end communications
+ try
+ {
+ failingSession.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
+ try
{
failingSession.close();
- numLostConnections++;
+ } catch (IOException e1)
+ {
+ // ignore
}
- } catch (IOException e1)
- {
- // ignore
+ numLostConnections++;
}
if (failingSession == session)
@@ -1708,6 +1918,19 @@
TopologyMsg topoMsg = (TopologyMsg)msg;
receiveTopo(topoMsg);
}
+ else if (msg instanceof StopMsg)
+ {
+ /*
+ * RS performs a proper disconnection
+ */
+ Message message =
+ NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer,
+ Integer.toString(rsServerId), baseDn.toString(),
+ Integer.toString(serverId));
+ logError(message);
+ // Try to find a suitable RS
+ this.reStart(failingSession);
+ }
else
{
return msg;
@@ -1723,10 +1946,10 @@
{
/*
- * If we did not initiate the close on our side, log a message.
+ * We did not initiate the close on our side, log an error message.
*/
Message message =
- NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer,
+ ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
Integer.toString(rsServerId), baseDn.toString(),
Integer.toString(serverId));
logError(message);
@@ -1783,14 +2006,26 @@
rsGroupId = (byte) -1;
rsServerId = -1;
rsServerUrl = null;
- try
+
+ if (session != null)
{
- if (session != null)
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end communications
+ try
+ {
+ session.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
+ try
{
session.close();
+ } catch (IOException e)
+ {
}
- } catch (IOException e)
- {
}
}
@@ -1896,7 +2131,7 @@
Collection<String> replicationServers, int window, long heartbeatInterval,
byte groupId)
{
- // These parameters needs to be renegociated with the ReplicationServer
+ // These parameters needs to be renegotiated with the ReplicationServer
// so if they have changed, that requires restarting the session with
// the ReplicationServer.
Boolean needToRestartSession = false;
@@ -1945,7 +2180,7 @@
private boolean debugEnabled()
{
- return true;
+ return false;
}
private static final void debugInfo(String s)
@@ -2057,13 +2292,13 @@
continue;
// Connect to server and get reply message
- ReplServerStartMsg replServerStartMsg =
+ ServerInfo serverInfo =
performPhaseOneHandshake(server, false);
- // Store reply message info in list
- if (replServerStartMsg != null)
+ // Is it a server with our group id ?
+ if (serverInfo != null)
{
- if (groupId == replServerStartMsg.getGroupId())
+ if (groupId == serverInfo.getGroupId())
{
// Found one server with the same group id as us, disconnect
// session to force reconnection to a server with same group
@@ -2072,6 +2307,20 @@
Byte.toString(groupId), baseDn.toString(),
Integer.toString(serverId));
logError(message);
+
+ if (protocolVersion >=
+ ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end
+ // communications
+ try
+ {
+ session.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
try
{
session.close();
--
Gitblit v1.10.0