From 41bef7c0b619c7bc925326451a56071b5736580a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 19 Jun 2013 08:36:16 +0000
Subject: [PATCH] Fix OPENDJ-986: Exception when reading messages from Replication server RS
---
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 73 ++++++++++++++----------------------
1 files changed, 28 insertions(+), 45 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 60c1c75..0291ac3 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -30,6 +30,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
import java.io.IOException;
import java.util.ArrayList;
@@ -76,8 +77,8 @@
{
try
{
- protocolVersion = ProtocolVersion.minWithCurrent(
- inReplServerStartMsg.getVersion());
+ short protocolVersion = getCompatibleVersion(inReplServerStartMsg
+ .getVersion());
session.setProtocolVersion(protocolVersion);
generationId = inReplServerStartMsg.getGenerationId();
serverId = inReplServerStartMsg.getServerId();
@@ -107,30 +108,21 @@
}
/**
- * Send the ReplServerStartMsg to the remote RS.
- * @param requestedProtocolVersion The provided protocol version.
+ * Sends a start message to the remote RS.
+ *
* @return The ReplServerStartMsg sent.
- * @throws IOException When an exception occurs.
+ * @throws IOException
+ * When an exception occurs.
*/
- private ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
- throws IOException
+ private ReplServerStartMsg sendStartToRemote() throws IOException
{
- ReplServerStartMsg outReplServerStartMsg
- = new ReplServerStartMsg(
- replicationServerId,
- replicationServerURL,
- getServiceId(),
- maxRcvWindow,
- replicationServerDomain.getDbServerState(),
- protocolVersion,
- localGenerationId,
- sslEncryption,
- getLocalGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold());
-
- session.publish(outReplServerStartMsg, requestedProtocolVersion);
-
+ ReplServerStartMsg outReplServerStartMsg = new ReplServerStartMsg(
+ replicationServerId, replicationServerURL, getServiceId(),
+ maxRcvWindow, replicationServerDomain.getDbServerState(),
+ localGenerationId, sslEncryption,
+ getLocalGroupId(), replicationServerDomain.getReplicationServer()
+ .getDegradedStatusThreshold());
+ send(outReplServerStartMsg);
return outReplServerStartMsg;
}
@@ -178,8 +170,7 @@
lockDomain(false); // no timeout
// Send start
- ReplServerStartMsg outReplServerStartMsg =
- sendStartToRemote(ProtocolVersion.getCurrentVersion());
+ ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
// Wait answer
ReplicationMsg msg = session.receive();
@@ -203,7 +194,7 @@
}
}
- // Process hello from remote
+ // Process hello from remote.
processStartFromRemote((ReplServerStartMsg)msg);
// Duplicate server ?
@@ -233,7 +224,7 @@
if (!this.sslEncryption)
session.stopEncryption();
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
/*
Only protocol version above V1 has a phase 2 handshake
@@ -242,7 +233,9 @@
Send our own TopologyMsg to remote RS
*/
- TopologyMsg outTopoMsg = sendTopoToRemoteRS();
+ TopologyMsg outTopoMsg =
+ replicationServerDomain.createTopologyMsgForRS();
+ sendTopoInfo(outTopoMsg);
// wait and process Topo from remote RS
TopologyMsg inTopoMsg = waitAndProcessTopoFromRemoteRS();
@@ -341,8 +334,7 @@
}
this.localGenerationId = replicationServerDomain.getGenerationId();
- ReplServerStartMsg outReplServerStartMsg =
- sendStartToRemote(protocolVersion);
+ ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
// log
logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
@@ -355,7 +347,7 @@
session.stopEncryption();
TopologyMsg inTopoMsg = null;
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
/*
Only protocol version above V1 has a phase 2 handshake
@@ -372,7 +364,9 @@
}
// send our own TopologyMsg to remote RS
- TopologyMsg outTopoMsg = sendTopoToRemoteRS();
+ TopologyMsg outTopoMsg = replicationServerDomain
+ .createTopologyMsgForRS();
+ sendTopoInfo(outTopoMsg);
// log
logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg);
@@ -476,18 +470,6 @@
}
/**
- * Create and send the topologyMsg to the remote replication server.
- * @return the topologyMsg sent.
- */
- private TopologyMsg sendTopoToRemoteRS()
- throws IOException
- {
- TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
- session.publish(outTopoMsg, protocolVersion);
- return outTopoMsg;
- }
-
- /**
* Wait receiving the TopologyMsg from the remote RS and process it.
* @return the topologyMsg received or {@code null} if stop was received.
* @throws DirectoryException
@@ -528,12 +510,13 @@
/* Store remote RS weight if it has one.
* For protocol version < 4, use default value of 1 for weight
*/
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// List should only contain RS info for sender
RSInfo rsInfo = inTopoMsg.getRsList().get(0);
weight = rsInfo.getWeight();
}
+
/*
if the remote RS and the local RS have the same genID
then it's ok and nothing else to do
--
Gitblit v1.10.0