From c02dd7f87e9ba574f06e5cc1eb36ebeb76b9f446 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 08 Oct 2009 16:02:17 +0000
Subject: [PATCH] - Addition of ReplServerStartDSMsg now sent to a DS connecting to a RS in handshake phase instead of a ReplServerStartMsg. ReplServerStartDSMsg contains same thing as ReplServerStartMsg but also contains
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 108 +++++++++++++++++++++++++++--------------------------
1 files changed, 55 insertions(+), 53 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index ce95024..489dbae 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,13 +50,14 @@
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
+import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
@@ -94,17 +95,21 @@
if (providedMsg != null)
{
if (debugEnabled())
- TRACER.debugInfo("In "
- + ((handler!=null)?handler.toString():"Replication Server")
- + " closing session with err=" +
- providedMsg.toString());
+ TRACER.debugInfo("In " +
+ ((handler != null) ? handler.toString() : "Replication Server") +
+ " closing session with err=" +
+ providedMsg.toString());
logError(providedMsg);
}
try
{
- if (providedSession!=null)
+ if (providedSession != null)
+ // This method is only called when aborting a failing handshake and
+ // not StopMsg should be sent in such situation. StopMsg are only
+ // expected when full handshake has been performed, or at end of
+ // handshake phase 1, when DS was just gathering available RS info
providedSession.close();
- } catch (IOException ee)
+ } catch (IOException e)
{
// ignore
}
@@ -174,7 +179,10 @@
private int rcvWindow;
private int rcvWindowSizeHalf;
- private int maxRcvWindow;
+ /**
+ * The size of the receiving window.
+ */
+ protected int maxRcvWindow;
/**
* Semaphore that the writer uses to control the flow to the remote server.
*/
@@ -197,7 +205,7 @@
*/
protected long localGenerationId = -1;
/**
- * The generation id before procesing a new start handshake.
+ * The generation id before processing a new start handshake.
*/
protected long oldGenerationId = -1;
/**
@@ -210,7 +218,7 @@
protected boolean initSslEncryption;
/**
- * The SSL encryption after the negociation with the peer.
+ * The SSL encryption after the negotiation with the peer.
*/
protected boolean sslEncryption;
/**
@@ -275,17 +283,6 @@
// be disturbed
if (session!=null)
{
- try
- {
- session.publish(
- new ErrorMsg(
- replicationServerDomain.getReplicationServer().getServerId(),
- serverId,
- reason));
- }
- catch(Exception e)
- {
- }
closeSession(session, reason, this);
}
@@ -991,7 +988,14 @@
replicationServerDomain.getReplicationServer().
getMonitorInstanceName() + this +
" publishes message:\n" + msg);
- session.publish(msg);
+ // Currently only MonitorMsg has to support a backward compatibility
+ if (msg instanceof MonitorMsg)
+ {
+ session.publish(msg, protocolVersion);
+ } else
+ {
+ session.publish(msg);
+ }
}
/**
@@ -1017,35 +1021,6 @@
}
/**
- * Send the ReplServerStartMsg to the remote server (RS or DS).
- * @param requestedProtocolVersion The provided protocol version.
- * @return The ReplServerStartMsg sent.
- * @throws IOException When an exception occurs.
- */
- public ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
- throws IOException
- {
- this.localGenerationId = replicationServerDomain.getGenerationId();
- ReplServerStartMsg outReplServerStartMsg
- = new ReplServerStartMsg(
- replicationServerId,
- replicationServerURL,
- getServiceId(),
- maxRcvWindow,
- replicationServerDomain.getDbServerState(),
- protocolVersion,
- localGenerationId,
- sslEncryption,
- getLocalGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold());
-
- session.publish(outReplServerStartMsg, requestedProtocolVersion);
-
- return outReplServerStartMsg;
- }
-
- /**
* Sends the provided TopologyMsg to the peer server.
*
* @param topoMsg The TopologyMsg message to be sent.
@@ -1058,7 +1033,7 @@
// V1 Rs do not support the TopologyMsg
if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
- session.publish(topoMsg);
+ session.publish(topoMsg, protocolVersion);
}
}
@@ -1110,6 +1085,18 @@
if (session != null)
{
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end
+ // communications
+ try
+ {
+ session.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
// Close session to end ServerReader or ServerWriter
try
{
@@ -1328,12 +1315,27 @@
replicationServerDomain.getReplicationServer().
getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + " :" +
- "\nSH SESSION HANDSHAKE RECEIVED:\n" +
+ "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
"\nAND REPLIED:\n" + outTopoMsg.toString());
}
}
/**
+ * Log stop message has been received.
+ */
+ protected void logStopReceived()
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + ", " +
+ this.getClass().getSimpleName() + " " + this + " :" +
+ "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
+ }
+ }
+
+ /**
* Log the messages involved in the Topology/StartSession handshake.
* @param inStartECLSessionMsg The message received first.
*/
--
Gitblit v1.10.0