From c02dd7f87e9ba574f06e5cc1eb36ebeb76b9f446 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 08 Oct 2009 16:02:17 +0000
Subject: [PATCH] - Addition of ReplServerStartDSMsg now sent to a DS connecting to a RS in handshake phase instead of a ReplServerStartMsg. ReplServerStartDSMsg contains same thing as ReplServerStartMsg but also contains
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 120 ++++++++++++++++++++++++++++++++++++++++++++++++++----------
1 files changed, 100 insertions(+), 20 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 3cd91c4..40fe57d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -145,7 +145,21 @@
try
{
if (session != null)
+ {
+ // V4 protocol introduces a StopMsg to properly close the
+ // connection between servers
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ try
+ {
+ session.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
session.close();
+ }
} catch (IOException e)
{
// ignore
@@ -461,7 +475,7 @@
{
TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
this.serverId);
- session.publish(outTopoMsg);
+ session.publish(outTopoMsg, protocolVersion);
return outTopoMsg;
}
/**
@@ -500,14 +514,13 @@
return;
}
- //
- ReplServerStartMsg outReplServerStartMsg = null;
+ StartMsg outStartMsg = null;
try
{
- outReplServerStartMsg = sendStartToRemote(protocolVersion);
+ outStartMsg = sendStartToRemote(protocolVersion);
// log
- logStartHandshakeRCVandSND(inServerStartMsg, outReplServerStartMsg);
+ logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
// The session initiator decides whether to use SSL.
// Until here session is encrypted then it depends on the negotiation
@@ -517,6 +530,13 @@
// wait and process StartSessionMsg from remote RS
StartSessionMsg inStartSessionMsg =
waitAndProcessStartSessionFromRemoteDS();
+ if (inStartSessionMsg == null)
+ {
+ // DS wants to properly close the connection (DS sent a StopMsg)
+ logStopReceived();
+ abortStart(null);
+ return;
+ }
// Send our own TopologyMsg to remote RS
TopologyMsg outTopoMsg = sendTopoToRemoteDS();
@@ -525,18 +545,12 @@
}
catch(IOException e)
{
- // We do not want polluting error log if error is due to normal session
- // aborted after handshake phase one from a DS that is searching for
- // best suitable RS.
-
- // don't log a polluting error when connection aborted
- // from a DS that wanted only to perform handshake phase 1 in order
- // to determine the best suitable RS:
- // 1) -> ServerStartMsg
- // 2) <- ReplServerStartMsg
- // 3) connection closure
-
- throw new DirectoryException(ResultCode.OTHER, null, null);
+ Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
+ Integer.toString(inServerStartMsg.getServerId()),
+ Integer.toString(replicationServerDomain.getReplicationServer().
+ getServerId()));
+ logError(errMessage);
+ throw new DirectoryException(ResultCode.OTHER, errMessage);
}
catch (NotSupportedOldVersionPDUException e)
{
@@ -578,6 +592,65 @@
replicationServerDomain.release();
}
}
+
+ /**
+ * Send the ReplServerStartDSMsg to the remote DS.
+ * @param requestedProtocolVersion The provided protocol version.
+ * @return The StartMsg sent.
+ * @throws IOException When an exception occurs.
+ */
+ private StartMsg sendStartToRemote(short requestedProtocolVersion)
+ throws IOException
+ {
+ // Before V4 protocol, we sent a ReplServerStartMsg
+ if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+
+ // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
+ ReplServerStartMsg outReplServerStartMsg
+ = new ReplServerStartMsg(
+ replicationServerId,
+ replicationServerURL,
+ getServiceId(),
+ maxRcvWindow,
+ replicationServerDomain.getDbServerState(),
+ protocolVersion,
+ localGenerationId,
+ sslEncryption,
+ getLocalGroupId(),
+ replicationServerDomain.
+ getReplicationServer().getDegradedStatusThreshold());
+
+ session.publish(outReplServerStartMsg, requestedProtocolVersion);
+
+ return outReplServerStartMsg;
+ }
+ else
+ {
+ // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
+ ReplServerStartDSMsg outReplServerStartDSMsg
+ = new ReplServerStartDSMsg(
+ replicationServerId,
+ replicationServerURL,
+ getServiceId(),
+ maxRcvWindow,
+ replicationServerDomain.getDbServerState(),
+ protocolVersion,
+ localGenerationId,
+ sslEncryption,
+ getLocalGroupId(),
+ replicationServerDomain.
+ getReplicationServer().getDegradedStatusThreshold(),
+ replicationServer.getWeight(),
+ replicationServerDomain.getConnectedLDAPservers().size());
+
+
+ session.publish(outReplServerStartDSMsg);
+
+ return outReplServerStartDSMsg;
+ }
+ }
+
/**
* Creates a DSInfo structure representing this remote DS.
* @return The DSInfo structure representing this remote DS
@@ -609,8 +682,10 @@
}
/**
- * Wait receiving the StartSessionMsg from the remote DS and process it.
- * @return the startSessionMsg received
+ * Wait receiving the StartSessionMsg from the remote DS and process it, or
+ * receiving a StopMsg to properly stop the handshake procedure.
+ * @return the startSessionMsg received or null DS sent a stop message to
+ * not finish the handshake.
* @throws DirectoryException
* @throws IOException
* @throws ClassNotFoundException
@@ -625,7 +700,12 @@
ReplicationMsg msg = null;
msg = session.receive();
- if (!(msg instanceof StartSessionMsg))
+ if (msg instanceof StopMsg)
+ {
+ // DS wants to stop handshake (was just for handshake phase one for RS
+ // choice). Return null to make the session be terminated.
+ return null;
+ } else if (!(msg instanceof StartSessionMsg))
{
Message message = Message.raw(
"Protocol error: StartSessionMsg required." + msg + " received.");
--
Gitblit v1.10.0