From 6c857f852f405a08653a4cfd86cc0257e7df486d Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 19 Jun 2013 08:36:16 +0000
Subject: [PATCH] Fix OPENDJ-986: Exception when reading messages from Replication server RS
---
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartMsg.java | 9
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java | 36 -
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 65 +-
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java | 23
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java | 12
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java | 10
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java | 3
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java | 24
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java | 25
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 95 +---
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java | 263 +++++-------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 18
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 73 +--
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java | 15
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java | 7
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java | 10
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java | 7
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java | 22
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java | 10
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java | 9
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java | 64 ---
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 7
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMsg.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java | 11
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 83 +--
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java | 2
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java | 41 +
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java | 11
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 10
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java | 10
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 78 +--
opendj-sdk/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java | 9
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java | 7
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java | 46 +-
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AckMsg.java | 2
47 files changed, 412 insertions(+), 740 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AckMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
index 3862a38..fc7289c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -214,7 +214,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
try
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java
index 947e30f..100ba4e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -91,7 +92,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
/*
* The message is stored in the form:
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index 091b23c..05f26dd 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -130,17 +130,6 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
- {
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
public byte[] getBytes(short protocolVersion)
{
if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
index 52798ad..50b1ba2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -86,7 +87,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
try
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
index 7fbc35d..b2a8cad 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -179,13 +179,14 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes() throws UnsupportedEncodingException
+ public byte[] getBytes(short protocolVersion)
+ throws UnsupportedEncodingException
{
byte[] byteCookie = String.valueOf(cookie).getBytes("UTF-8");
byte[] byteServiceId = String.valueOf(serviceId).getBytes("UTF-8");
byte[] byteDraftChangeNumber =
Integer.toString(draftChangeNumber).getBytes("UTF-8");
- byte[] byteUpdateMsg = updateMsg.getBytes();
+ byte[] byteUpdateMsg = updateMsg.getBytes(protocolVersion);
int length = 1 + byteCookie.length +
1 + byteServiceId.length +
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
index 344f2db..e786a48 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
@@ -150,16 +150,6 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
- throws UnsupportedEncodingException
- {
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public byte[] getBytes(short version)
{
try {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
index c1321a6..e8718d3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
import org.opends.messages.Message;
@@ -174,15 +175,6 @@
// ============
// Msg encoding
// ============
- /**
- * {@inheritDoc}
- */
- @Override
- public byte[] getBytes()
- throws UnsupportedEncodingException
- {
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
/**
* {@inheritDoc}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java
index 7bbfe92..7ad610f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -67,7 +68,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
/*
* The heartbeat message contains:
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java
index 71a2912..6d5f2ce 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -103,7 +104,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
try {
byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
index 28aab82..06d307c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -130,15 +131,6 @@
// ============
// Msg encoding
// ============
- /**
- * {@inheritDoc}
- */
- @Override
- public byte[] getBytes()
- throws UnsupportedEncodingException
- {
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
/**
* {@inheritDoc}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
index 034246a..e433e83 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -176,15 +177,6 @@
// ============
// Msg encoding
// ============
- /**
- * {@inheritDoc}
- */
- @Override
- public byte[] getBytes()
- throws UnsupportedEncodingException
- {
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
/**
* {@inheritDoc}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
index bce6fcf..9e5ac9c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -82,7 +82,7 @@
/**
* Creates a new UpdateMsg.
*/
- public LDAPUpdateMsg()
+ protected LDAPUpdateMsg()
{
}
@@ -226,7 +226,7 @@
*/
public void encode() throws UnsupportedEncodingException
{
- bytes = getBytes();
+ bytes = getBytes(ProtocolVersion.getCurrentVersion());
}
/**
@@ -347,17 +347,6 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
- throws UnsupportedEncodingException
- {
- // Encode in the current protocol version
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public byte[] getBytes(short reqProtocolVersion)
throws UnsupportedEncodingException
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index c616ff4..dfb6eb5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -330,15 +330,6 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
- {
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public byte[] getBytes(short protocolVersion)
{
try
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
index 0924254..f742f1b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -88,7 +89,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
try
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
index 8bf6c5a..c37bfff 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -49,31 +49,14 @@
public abstract void close();
/**
- * This method is called when a ReplicationMsg must be sent to
- * the remote entity. The PDU is send using serialization of the current
- * protocol version.
+ * Sends a replication message to the remote peer.
*
- * It can be called by several threads and must implement appropriate
- * replication (typically, this method or a part of it should be
- * synchronized).
- *
- * @param msg The ReplicationMsg that must be sent.
- * @throws IOException If an IO error happen during the publish process.
+ * @param msg
+ * The message to be sent.
+ * @throws IOException
+ * If an IO error occurred.
*/
- public abstract void publish(ReplicationMsg msg)
- throws IOException;
-
- /**
- * Same as publish(ReplicationMsg msg), but forcing the usage of a particular
- * protocol version for the PDU serialization.
- *
- * @param msg The ReplicationMsg that must be sent.
- * @param reqProtocolVersion The protocol version to use for serialization.
- * The version should normally be older than the current one.
- * @throws IOException If an IO error happen during the publish process.
- */
- public abstract void publish(ReplicationMsg msg, short reqProtocolVersion)
- throws IOException;
+ public abstract void publish(ReplicationMsg msg) throws IOException;
/**
* Attempt to receive a ReplicationMsg.
@@ -177,4 +160,11 @@
* @param version The version of the protocol that is currently used.
*/
public abstract void setProtocolVersion(short version);
+
+ /**
+ * Returns the version of the protocol that is currently used.
+ *
+ * @return The version of the protocol that is currently used.
+ */
+ public abstract short getProtocolVersion();
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index b8b834b..be9f1d0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -108,7 +108,7 @@
* @param version The version to be compared to the current one.
* @return The minimal protocol version.
*/
- public static short minWithCurrent(short version)
+ public static short getCompatibleVersion(short version)
{
return (version < CURRENT_VERSION ? version : CURRENT_VERSION);
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
index 5c507ec..2018214 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -74,7 +75,6 @@
* @param baseDn base DN for which the ReplServerStartDSMsg is created.
* @param windowSize The window size.
* @param serverState our ServerState for this baseDn.
- * @param protocolVersion The replication protocol version of the creator.
* @param generationId The generationId for this server.
* @param sslEncryption Whether to continue using SSL to encrypt messages
* after the start messages have been exchanged.
@@ -87,7 +87,6 @@
public ReplServerStartDSMsg(int serverId, String serverURL, String baseDn,
int windowSize,
ServerState serverState,
- short protocolVersion,
long generationId,
boolean sslEncryption,
byte groupId,
@@ -95,7 +94,7 @@
int weight,
int connectedDSNumber)
{
- super(protocolVersion, generationId);
+ super((short) -1 /* version set when sending */, generationId);
this.serverId = serverId;
this.serverURL = serverURL;
if (baseDn != null)
@@ -250,17 +249,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
- throws UnsupportedEncodingException
- {
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public byte[] getBytes(short protocolVersion)
+ public byte[] getBytes(short sessionProtocolVersion)
throws UnsupportedEncodingException
{
/* The ReplServerStartDSMsg is stored in the form :
@@ -268,7 +257,6 @@
* <degradedStatusThreshold><weight><connectedDSNumber>
* <serverState>
*/
-
byte[] byteDn = baseDn.getBytes("UTF-8");
byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
byte[] byteServerUrl = serverURL.getBytes("UTF-8");
@@ -290,8 +278,8 @@
byteServerState.length + 1;
/* encode the header in a byte[] large enough */
- byte resultByteArray[] =
- encodeHeader(MSG_TYPE_REPL_SERVER_START_DS, length, protocolVersion);
+ byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START_DS,
+ length, sessionProtocolVersion);
int pos = headerLength;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
index 6fce977..faa68cd 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -72,7 +73,6 @@
* @param baseDn base DN for which the ReplServerStartMsg is created.
* @param windowSize The window size.
* @param serverState our ServerState for this baseDn.
- * @param protocolVersion The replication protocol version of the creator.
* @param generationId The generationId for this server.
* @param sslEncryption Whether to continue using SSL to encrypt messages
* after the start messages have been exchanged.
@@ -82,13 +82,12 @@
public ReplServerStartMsg(int serverId, String serverURL, String baseDn,
int windowSize,
ServerState serverState,
- short protocolVersion,
long generationId,
boolean sslEncryption,
byte groupId,
int degradedStatusThreshold)
{
- super(protocolVersion, generationId);
+ super((short) -1 /* version set when sending */, generationId);
this.serverId = serverId;
this.serverURL = serverURL;
if (baseDn != null)
@@ -302,21 +301,11 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
- throws UnsupportedEncodingException
- {
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public byte[] getBytes(short protocolVersion)
+ public byte[] getBytes(short sessionProtocolVersion)
throws UnsupportedEncodingException
{
// If an older version requested, encode in the requested way
- switch(protocolVersion)
+ switch(sessionProtocolVersion)
{
case ProtocolVersion.REPLICATION_PROTOCOL_V1:
return getBytes_V1();
@@ -344,8 +333,8 @@
byteServerState.length + 1;
/* encode the header in a byte[] large enough */
- byte resultByteArray[] =
- encodeHeader(MSG_TYPE_REPL_SERVER_START, length, protocolVersion);
+ byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length,
+ sessionProtocolVersion);
int pos = headerLength;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index 7c7cbb5..0c46ee0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -91,176 +91,129 @@
// change accordingly generateMsg method below
/**
- * Return the byte[] representation of this message.
- * Depending on the message type, the first byte of the byte[] must be one of
- * the MSG_TYPE* definitions. The serialization is done using the current
- * protocol version. For a serialization using a particular protocol version,
- * call the getBytes(byte protocolVersion) method that should be available
- * for the subclasses (PDUs) that allow such a translation.
- *
- * @return the byte[] representation of this message.
- * @throws UnsupportedEncodingException When the encoding of the message
- * failed because the UTF-8 encoding is not supported.
+ * Protected constructor.
*/
- public abstract byte[] getBytes() throws UnsupportedEncodingException;
+ protected ReplicationMsg()
+ {
+ // Nothing to do.
+ }
/**
* Serializes the PDU using the provided replication protocol version.
* WARNING: should be overwritten by a PDU (sub class) we want to support
* older protocol version serialization for.
- * @param reqProtocolVersion The protocol version to use for serialization.
- * The version should normally be older than the current one.
+ *
+ * @param protocolVersion
+ * The protocol version to use for serialization. The version should
+ * normally be older than the current one.
* @return The encoded PDU.
- * @throws UnsupportedEncodingException When the encoding of the message
- * failed because the UTF-8 encoding is not supported or the
- * requested protocol version to use is not supported by this PDU.
+ * @throws UnsupportedEncodingException
+ * When the encoding of the message failed because the UTF-8
+ * encoding is not supported or the requested protocol version to
+ * use is not supported by this PDU.
*/
- public byte[] getBytes(short reqProtocolVersion)
- throws UnsupportedEncodingException
- {
- // Of course, always support current protocol version
- if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
- {
- return getBytes();
- }
+ public abstract byte[] getBytes(short protocolVersion)
+ throws UnsupportedEncodingException;
- // Unsupported requested version
- // Any PDU that support older protocol version serialization should
- // overwrite this method for that.
- throw new UnsupportedEncodingException(getClass().getSimpleName() +
- " PDU does not support requested protocol version serialization: " +
- reqProtocolVersion);
- }
/**
- * Generates a ReplicationMsg from its encoded form. This un-serialization
- * is done taking into account the various supported replication protocol
+ * Generates a ReplicationMsg from its encoded form. This un-serialization is
+ * done taking into account the various supported replication protocol
* versions.
*
- * @param buffer The encode form of the ReplicationMsg.
- * @param version The version to use to decode the msg.
- *
+ * @param buffer
+ * The encode form of the ReplicationMsg.
+ * @param protocolVersion
+ * The version to use to decode the msg.
* @return The generated SynchronizationMessage.
- *
- * @throws DataFormatException If the encoded form was not a valid msg.
- * @throws UnsupportedEncodingException If UTF8 is not supported.
- * @throws NotSupportedOldVersionPDUException If the PDU is part of an old
- * protocol version and we do not support it.
+ * @throws DataFormatException
+ * If the encoded form was not a valid msg.
+ * @throws UnsupportedEncodingException
+ * If UTF8 is not supported.
+ * @throws NotSupportedOldVersionPDUException
+ * If the PDU is part of an old protocol version and we do not
+ * support it.
*/
- public static ReplicationMsg generateMsg(
- byte[] buffer,
- short version)
- throws DataFormatException, UnsupportedEncodingException,
- NotSupportedOldVersionPDUException
+ public static ReplicationMsg generateMsg(byte[] buffer, short protocolVersion)
+ throws DataFormatException, UnsupportedEncodingException,
+ NotSupportedOldVersionPDUException
{
- ReplicationMsg msg;
switch (buffer[0])
{
- case MSG_TYPE_SERVER_START_V1:
- throw new NotSupportedOldVersionPDUException("Server Start",
+ case MSG_TYPE_SERVER_START_V1:
+ throw new NotSupportedOldVersionPDUException("Server Start",
ProtocolVersion.REPLICATION_PROTOCOL_V1, buffer[0]);
- case MSG_TYPE_REPL_SERVER_INFO_V1:
- throw new NotSupportedOldVersionPDUException("Replication Server Info",
+ case MSG_TYPE_REPL_SERVER_INFO_V1:
+ throw new NotSupportedOldVersionPDUException("Replication Server Info",
ProtocolVersion.REPLICATION_PROTOCOL_V1, buffer[0]);
- case MSG_TYPE_MODIFY:
- msg = new ModifyMsg(buffer);
- break;
- case MSG_TYPE_MODIFY_V1:
- msg = ModifyMsg.createV1(buffer);
- break;
- case MSG_TYPE_ADD:
- case MSG_TYPE_ADD_V1:
- msg = new AddMsg(buffer);
- break;
- case MSG_TYPE_DELETE:
- case MSG_TYPE_DELETE_V1:
- msg = new DeleteMsg(buffer);
- break;
- case MSG_TYPE_MODIFYDN:
- case MSG_TYPE_MODIFYDN_V1:
- msg = new ModifyDNMsg(buffer);
- break;
- case MSG_TYPE_ACK:
- msg = new AckMsg(buffer);
- break;
- case MSG_TYPE_SERVER_START:
- msg = new ServerStartMsg(buffer);
- break;
- case MSG_TYPE_REPL_SERVER_START:
- case MSG_TYPE_REPL_SERVER_START_V1:
- msg = new ReplServerStartMsg(buffer);
- break;
- case MSG_TYPE_WINDOW:
- msg = new WindowMsg(buffer);
- break;
- case MSG_TYPE_HEARTBEAT:
- msg = new HeartbeatMsg(buffer);
- break;
- case MSG_TYPE_INITIALIZE_REQUEST:
- msg = new InitializeRequestMsg(buffer, version);
- break;
- case MSG_TYPE_INITIALIZE_TARGET:
- msg = new InitializeTargetMsg(buffer, version);
- break;
- case MSG_TYPE_ENTRY:
- msg = new EntryMsg(buffer, version);
- break;
- case MSG_TYPE_DONE:
- msg = new DoneMsg(buffer);
- break;
- case MSG_TYPE_ERROR:
- msg = new ErrorMsg(buffer, version);
- break;
- case MSG_TYPE_RESET_GENERATION_ID:
- msg = new ResetGenerationIdMsg(buffer);
- break;
- case MSG_TYPE_WINDOW_PROBE:
- msg = new WindowProbeMsg(buffer);
- break;
- case MSG_TYPE_TOPOLOGY:
- msg = new TopologyMsg(buffer, version);
- break;
- case MSG_TYPE_REPL_SERVER_MONITOR_REQUEST:
- msg = new MonitorRequestMsg(buffer);
- break;
- case MSG_TYPE_REPL_SERVER_MONITOR:
- msg = new MonitorMsg(buffer, version);
- break;
- case MSG_TYPE_START_SESSION:
- msg = new StartSessionMsg(buffer, version);
- break;
- case MSG_TYPE_CHANGE_STATUS:
- msg = new ChangeStatusMsg(buffer);
- break;
- case MSG_TYPE_GENERIC_UPDATE:
- msg = new UpdateMsg(buffer);
- break;
- case MSG_TYPE_START_ECL:
- msg = new ServerStartECLMsg(buffer);
- break;
- case MSG_TYPE_START_ECL_SESSION:
- msg = new StartECLSessionMsg(buffer);
- break;
- case MSG_TYPE_ECL_UPDATE:
- msg = new ECLUpdateMsg(buffer);
- break;
- case MSG_TYPE_CT_HEARTBEAT:
- msg = new ChangeTimeHeartbeatMsg(buffer, version);
- break;
- case MSG_TYPE_REPL_SERVER_START_DS:
- msg = new ReplServerStartDSMsg(buffer);
- break;
- case MSG_TYPE_STOP:
- msg = new StopMsg(buffer);
- break;
- case MSG_TYPE_INITIALIZE_RCV_ACK:
- msg = new InitializeRcvAckMsg(buffer);
- break;
- default:
- throw new DataFormatException("received message with unknown type");
+ case MSG_TYPE_MODIFY:
+ return new ModifyMsg(buffer);
+ case MSG_TYPE_MODIFY_V1:
+ return ModifyMsg.createV1(buffer);
+ case MSG_TYPE_ADD:
+ case MSG_TYPE_ADD_V1:
+ return new AddMsg(buffer);
+ case MSG_TYPE_DELETE:
+ case MSG_TYPE_DELETE_V1:
+ return new DeleteMsg(buffer);
+ case MSG_TYPE_MODIFYDN:
+ case MSG_TYPE_MODIFYDN_V1:
+ return new ModifyDNMsg(buffer);
+ case MSG_TYPE_ACK:
+ return new AckMsg(buffer);
+ case MSG_TYPE_SERVER_START:
+ return new ServerStartMsg(buffer);
+ case MSG_TYPE_REPL_SERVER_START:
+ case MSG_TYPE_REPL_SERVER_START_V1:
+ return new ReplServerStartMsg(buffer);
+ case MSG_TYPE_WINDOW:
+ return new WindowMsg(buffer);
+ case MSG_TYPE_HEARTBEAT:
+ return new HeartbeatMsg(buffer);
+ case MSG_TYPE_INITIALIZE_REQUEST:
+ return new InitializeRequestMsg(buffer, protocolVersion);
+ case MSG_TYPE_INITIALIZE_TARGET:
+ return new InitializeTargetMsg(buffer, protocolVersion);
+ case MSG_TYPE_ENTRY:
+ return new EntryMsg(buffer, protocolVersion);
+ case MSG_TYPE_DONE:
+ return new DoneMsg(buffer);
+ case MSG_TYPE_ERROR:
+ return new ErrorMsg(buffer, protocolVersion);
+ case MSG_TYPE_RESET_GENERATION_ID:
+ return new ResetGenerationIdMsg(buffer);
+ case MSG_TYPE_WINDOW_PROBE:
+ return new WindowProbeMsg(buffer);
+ case MSG_TYPE_TOPOLOGY:
+ return new TopologyMsg(buffer, protocolVersion);
+ case MSG_TYPE_REPL_SERVER_MONITOR_REQUEST:
+ return new MonitorRequestMsg(buffer);
+ case MSG_TYPE_REPL_SERVER_MONITOR:
+ return new MonitorMsg(buffer, protocolVersion);
+ case MSG_TYPE_START_SESSION:
+ return new StartSessionMsg(buffer, protocolVersion);
+ case MSG_TYPE_CHANGE_STATUS:
+ return new ChangeStatusMsg(buffer);
+ case MSG_TYPE_GENERIC_UPDATE:
+ return new UpdateMsg(buffer);
+ case MSG_TYPE_START_ECL:
+ return new ServerStartECLMsg(buffer);
+ case MSG_TYPE_START_ECL_SESSION:
+ return new StartECLSessionMsg(buffer);
+ case MSG_TYPE_ECL_UPDATE:
+ return new ECLUpdateMsg(buffer);
+ case MSG_TYPE_CT_HEARTBEAT:
+ return new ChangeTimeHeartbeatMsg(buffer, protocolVersion);
+ case MSG_TYPE_REPL_SERVER_START_DS:
+ return new ReplServerStartDSMsg(buffer);
+ case MSG_TYPE_STOP:
+ return new StopMsg(buffer);
+ case MSG_TYPE_INITIALIZE_RCV_ACK:
+ return new InitializeRcvAckMsg(buffer);
+ default:
+ throw new DataFormatException("received message with unknown type");
}
- return msg;
}
/**
@@ -283,15 +236,21 @@
return pos;
}
+
+
/**
* Get the length of the next String encoded in the in byte array.
*
- * @param in the byte array where to calculate the string.
- * @param pos the position where to start from in the byte array.
+ * @param in
+ * the byte array where to calculate the string.
+ * @param pos
+ * the position where to start from in the byte array.
* @return the length of the next string.
- * @throws DataFormatException If the byte array does not end with null.
+ * @throws DataFormatException
+ * If the byte array does not end with null.
*/
- protected int getNextLength(byte[] in, int pos) throws DataFormatException
+ protected static int getNextLength(byte[] in, int pos)
+ throws DataFormatException
{
int offset = pos;
int length = 0;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java
index 5186ea3..79c1bb9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -83,7 +84,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
try
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
index db3b7e4..2538995 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -73,10 +73,9 @@
* @param windowSize The window size used by this server.
* @param heartbeatInterval The requested heartbeat interval.
* @param serverState The state of this server.
- * @param protocolVersion The replication protocol version of the creator.
* @param generationId The generationId for this server.
* @param sslEncryption Whether to continue using SSL to encrypt messages
-* after the start messages have been exchanged.
+ * after the start messages have been exchanged.
* @param groupId The group id of the DS for this DN
*/
public ServerStartECLMsg(String serverURL, int maxReceiveDelay,
@@ -84,12 +83,11 @@
int maxSendQueue, int windowSize,
long heartbeatInterval,
ServerState serverState,
- short protocolVersion,
long generationId,
boolean sslEncryption,
byte groupId)
{
- super(protocolVersion, generationId);
+ super((short) -1 /* version set when sending */, generationId);
this.serverURL = serverURL;
this.maxReceiveDelay = maxReceiveDelay;
@@ -251,7 +249,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short sessionProtocolVersion)
{
try {
byte[] byteServerUrl = serverURL.getBytes("UTF-8");
@@ -282,8 +280,8 @@
byteServerState.length + 1;
/* encode the header in a byte[] large enough to also contain the mods */
- byte resultByteArray[] = encodeHeader(
- MSG_TYPE_START_ECL, length, ProtocolVersion.getCurrentVersion());
+ byte resultByteArray[] = encodeHeader(MSG_TYPE_START_ECL, length,
+ sessionProtocolVersion);
int pos = headerLength;
pos = addByteArray(byteServerUrl, resultByteArray, pos);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
index d516608..c2a0da1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -73,7 +73,6 @@
* @param windowSize The window size used by this server.
* @param heartbeatInterval The requested heartbeat interval.
* @param serverState The state of this server.
- * @param protocolVersion The replication protocol version of the creator.
* @param generationId The generationId for this server.
* @param sslEncryption Whether to continue using SSL to encrypt messages
* after the start messages have been exchanged.
@@ -81,10 +80,10 @@
*/
public ServerStartMsg(int serverId2, String serverURL, String baseDn,
int windowSize, long heartbeatInterval, ServerState serverState,
- short protocolVersion, long generationId, boolean sslEncryption,
+ long generationId, boolean sslEncryption,
byte groupId)
{
- super(protocolVersion, generationId);
+ super((short) -1 /* version set when sending */, generationId);
this.serverId = serverId2;
this.serverURL = serverURL;
@@ -282,7 +281,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short sessionProtocolVersion)
{
try {
byte[] byteDn = baseDn.getBytes("UTF-8");
@@ -316,8 +315,8 @@
byteServerState.length + 1;
/* encode the header in a byte[] large enough to also contain the mods */
- byte resultByteArray[] = encodeHeader(
- MSG_TYPE_SERVER_START, length, ProtocolVersion.getCurrentVersion());
+ byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length,
+ sessionProtocolVersion);
int pos = headerLength;
pos = addByteArray(byteDn, resultByteArray, pos);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
index 99c16d2..5ac31be 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -212,7 +212,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
String excludedSIDsString = "";
for (String excludedServiceID : excludedServiceIDs)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
index b621cae..d9f16e2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -54,8 +54,9 @@
/**
* Create a new StartMsg.
*/
- public StartMsg()
+ protected StartMsg()
{
+ // Nothing to do.
}
/**
@@ -78,7 +79,7 @@
* @param type The type of the message to create.
* @param additionalLength Additional length needed to encode the remaining
* part of the UpdateMessage.
- * @param protocolVersion The version to use when encoding the header.
+ * @param sessionProtocolVersion The version to use when encoding the header.
* @return a byte array containing the common header and enough space to
* encode the remaining bytes of the UpdateMessage as was specified
* by the additionalLength.
@@ -87,7 +88,7 @@
*/
public byte[] encodeHeader(
byte type, int additionalLength,
- short protocolVersion)
+ short sessionProtocolVersion)
throws UnsupportedEncodingException
{
@@ -106,7 +107,7 @@
encodedMsg[0] = type;
/* put the protocol version */
- encodedMsg[1] = (byte)protocolVersion;
+ encodedMsg[1] = (byte)sessionProtocolVersion;
/* put the generationId */
int pos = 2;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
index 21e9d8d..c3d84be 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -77,11 +77,6 @@
private Set<String> eclIncludesForDeletes = new HashSet<String>();
/**
- * The protocolVersion that should be used when serializing this message.
- */
- private final short protocolVersion;
-
- /**
* Creates a new StartSessionMsg message from its encoded form.
*
* @param in The byte array containing the encoded form of the message.
@@ -91,7 +86,6 @@
*/
public StartSessionMsg(byte[] in, short version) throws DataFormatException
{
- protocolVersion = ProtocolVersion.getCurrentVersion();
if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
{
decode_V23(in);
@@ -103,29 +97,6 @@
}
/**
- * Creates a new StartSessionMsg message from its encoded form.
- *
- * Creates a new message with the given required parameters.
- * @param status Status we are starting with
- * @param referralsURLs Referrals URLs to be used by peer DSs
- * @param assuredFlag If assured mode is enabled or not
- * @param assuredMode Assured type
- * @param safeDataLevel Assured mode safe data level
- * @param replicationProtocol The protocol version to use.
- */
- public StartSessionMsg(ServerStatus status, List<String> referralsURLs,
- boolean assuredFlag, AssuredMode assuredMode, byte safeDataLevel,
- short replicationProtocol)
- {
- this.referralsURLs = referralsURLs;
- this.status = status;
- this.assuredFlag = assuredFlag;
- this.assuredMode = assuredMode;
- this.safeDataLevel = safeDataLevel;
- this.protocolVersion = replicationProtocol;
- }
-
- /**
* Creates a new message with the given required parameters.
* @param status Status we are starting with
* @param referralsURLs Referrals URLs to be used by peer DSs
@@ -141,7 +112,6 @@
this.assuredFlag = assuredFlag;
this.assuredMode = assuredMode;
this.safeDataLevel = safeDataLevel;
- this.protocolVersion = ProtocolVersion.getCurrentVersion();
}
/**
@@ -155,23 +125,6 @@
this.referralsURLs = referralsURLs;
this.status = status;
this.assuredFlag = false;
- this.protocolVersion = ProtocolVersion.getCurrentVersion();
- }
-
- /**
- * Creates a new message with the given required parameters.
- * Assured mode is false.
- * @param status Status we are starting with
- * @param referralsURLs Referrals URLs to be used by peer DSs
- * @param replicationProtocol The requested protocol version.
- */
- public StartSessionMsg(ServerStatus status, List<String> referralsURLs,
- short replicationProtocol)
- {
- this.referralsURLs = referralsURLs;
- this.status = status;
- this.assuredFlag = false;
- this.protocolVersion = replicationProtocol;
}
// ============
@@ -182,23 +135,6 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
- throws UnsupportedEncodingException
- {
- if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
- {
- return getBytes_V23();
- }
- else
- {
- return getBytes_V45(protocolVersion);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public byte[] getBytes(short reqProtocolVersion)
throws UnsupportedEncodingException
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
index e6c260a..4c2db2a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -60,7 +61,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
return new byte[]
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index 112efab..f27c950 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -312,19 +312,7 @@
@Override
public void publish(final ReplicationMsg msg) throws IOException
{
- publish(msg, ProtocolVersion.getCurrentVersion());
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void publish(final ReplicationMsg msg,
- final short reqProtocolVersion) throws IOException
- {
- final byte[] buffer = msg.getBytes(reqProtocolVersion);
+ final byte[] buffer = msg.getBytes(protocolVersion);
final String str = String.format("%08x", buffer.length);
final byte[] sendLengthBuf = str.getBytes();
@@ -460,6 +448,17 @@
* {@inheritDoc}
*/
@Override
+ public short getProtocolVersion()
+ {
+ return protocolVersion;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void setSoTimeout(final int timeout) throws SocketException
{
plainSocket.setSoTimeout(timeout);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index 16fbfc5..4b5306e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -298,16 +298,6 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
- throws UnsupportedEncodingException
- {
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public byte[] getBytes(short version)
throws UnsupportedEncodingException
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
index 1c30cc8..cbef678 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -109,6 +109,8 @@
/**
* Creates a new UpdateMsg with the given informations.
+ * <p>
+ * This constructor is only used for testing.
*
* @param changeNumber The ChangeNumber associated with the change
* encoded in this message.
@@ -179,18 +181,6 @@
return changeNumber.compareTo(msg.getChangeNumber());
}
-
- /**
- * {@inheritDoc}
- */
- @Override
- public byte[] getBytes(short reqProtocolVersion)
- throws UnsupportedEncodingException
- {
- // There was no change since version 2.
- return getBytes();
- }
-
/**
* Get the assured mode in this message.
* @return The assured mode in this message
@@ -355,20 +345,37 @@
}
/**
+ * Returns the encoded representation of this update message using the current
+ * protocol version.
+ *
+ * @return The encoded representation of this update message.
+ * @throws UnsupportedEncodingException
+ * If the message could not be encoded.
+ */
+ public byte[] getBytes() throws UnsupportedEncodingException
+ {
+ return getBytes(ProtocolVersion.getCurrentVersion());
+ }
+
+ /**
+ * This implementation is only called during unit testing, so we are free to
+ * force the protocol version. Underlying implementations override this method
+ * in order to provide version specific encodings.
+ *
* {@inheritDoc}
*/
@Override
- public byte[] getBytes() throws UnsupportedEncodingException
+ public byte[] getBytes(short protocolVersion)
+ throws UnsupportedEncodingException
{
/* Encode the header in a byte[] large enough to also contain the payload */
- byte [] resultByteArray =
- encodeHeader(MSG_TYPE_GENERIC_UPDATE, payload.length,
- ProtocolVersion.getCurrentVersion());
+ byte[] resultByteArray = encodeHeader(MSG_TYPE_GENERIC_UPDATE,
+ payload.length, ProtocolVersion.getCurrentVersion());
int pos = resultByteArray.length - payload.length;
/* Add the payload */
- for (int i=0; i<payload.length; i++,pos++)
+ for (int i = 0; i < payload.length; i++, pos++)
{
resultByteArray[pos] = payload[i];
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMsg.java
index 57cbac0..92f3fe9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -93,7 +94,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
/*
* WindowMsg contains.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
index 2f06913..91e3679 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -68,7 +69,7 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short protocolVersion)
{
// WindowProbeMsg Message only contains its type.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 63a15fe..ef55993 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -31,6 +31,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.replication.common.StatusMachine.*;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
import java.io.IOException;
import java.util.ArrayList;
@@ -147,7 +148,7 @@
{
// V4 protocol introduces a StopMsg to properly close the
// connection between servers
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
try
{
@@ -411,9 +412,9 @@
public boolean processStartFromRemote(ServerStartMsg serverStartMsg)
throws DirectoryException
{
+ session
+ .setProtocolVersion(getCompatibleVersion(serverStartMsg.getVersion()));
tmpGenerationId = serverStartMsg.getGenerationId();
- protocolVersion = ProtocolVersion.minWithCurrent(
- serverStartMsg.getVersion());
serverId = serverStartMsg.getServerId();
serverURL = serverStartMsg.getServerURL();
groupId = serverStartMsg.getGroupId();
@@ -451,14 +452,14 @@
}
// Send our own TopologyMsg to DS
- private TopologyMsg sendTopoToRemoteDS()
- throws IOException
+ private TopologyMsg sendTopoToRemoteDS() throws IOException
{
- TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
- this.serverId);
- session.publish(outTopoMsg, protocolVersion);
+ TopologyMsg outTopoMsg = replicationServerDomain
+ .createTopologyMsgForDS(this.serverId);
+ sendTopoInfo(outTopoMsg);
return outTopoMsg;
}
+
/**
* Starts the handler from a remote ServerStart message received from
* the remote data server.
@@ -512,7 +513,7 @@
try
{
- StartMsg outStartMsg = sendStartToRemote(protocolVersion);
+ StartMsg outStartMsg = sendStartToRemote();
// log
logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
@@ -597,61 +598,41 @@
}
/**
- * Send the ReplServerStartDSMsg to the remote DS.
- * @param requestedProtocolVersion The provided protocol version.
+ * Sends a start message to the remote DS.
+ *
* @return The StartMsg sent.
- * @throws IOException When an exception occurs.
+ * @throws IOException
+ * When an exception occurs.
*/
- private StartMsg sendStartToRemote(short requestedProtocolVersion)
- throws IOException
+ private StartMsg sendStartToRemote() throws IOException
{
+ final StartMsg startMsg;
+
// Before V4 protocol, we sent a ReplServerStartMsg
- if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
-
// Peer DS uses protocol < V4 : send it a ReplServerStartMsg
- ReplServerStartMsg outReplServerStartMsg
- = new ReplServerStartMsg(
- replicationServerId,
- replicationServerURL,
- getServiceId(),
- maxRcvWindow,
+ startMsg = new ReplServerStartMsg(replicationServerId,
+ replicationServerURL, getServiceId(), maxRcvWindow,
replicationServerDomain.getDbServerState(),
- protocolVersion,
- localGenerationId,
- sslEncryption,
- getLocalGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold());
-
- session.publish(outReplServerStartMsg, requestedProtocolVersion);
-
- return outReplServerStartMsg;
+ localGenerationId, sslEncryption, getLocalGroupId(),
+ replicationServerDomain.getReplicationServer()
+ .getDegradedStatusThreshold());
}
else
{
// Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
- ReplServerStartDSMsg outReplServerStartDSMsg
- = new ReplServerStartDSMsg(
- replicationServerId,
- replicationServerURL,
- getServiceId(),
- maxRcvWindow,
+ startMsg = new ReplServerStartDSMsg(replicationServerId,
+ replicationServerURL, getServiceId(), maxRcvWindow,
replicationServerDomain.getDbServerState(),
- protocolVersion,
- localGenerationId,
- sslEncryption,
- getLocalGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold(),
- replicationServer.getWeight(),
+ localGenerationId, sslEncryption, getLocalGroupId(),
+ replicationServerDomain.getReplicationServer()
+ .getDegradedStatusThreshold(), replicationServer.getWeight(),
replicationServerDomain.getConnectedLDAPservers().size());
-
-
- session.publish(outReplServerStartDSMsg);
-
- return outReplServerStartDSMsg;
}
+
+ send(startMsg);
+ return startMsg;
}
/**
@@ -662,7 +643,7 @@
{
return new DSInfo(serverId, serverURL, replicationServerId, generationId,
status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
- eclIncludes, eclIncludesForDeletes, protocolVersion);
+ eclIncludes, eclIncludesForDeletes, getProtocolVersion());
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 69663a1..4e35589 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -30,6 +30,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
import java.io.IOException;
import java.util.ArrayList;
@@ -325,13 +326,12 @@
{
try
{
- protocolVersion = ProtocolVersion.minWithCurrent(
- inECLStartMsg.getVersion());
- generationId = inECLStartMsg.getGenerationId();
+ session.setProtocolVersion(getCompatibleVersion(inECLStartMsg
+ .getVersion()));
serverURL = inECLStartMsg.getServerURL();
setInitialServerState(inECLStartMsg.getServerState());
setSendWindowSize(inECLStartMsg.getWindowSize());
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
// We support connection from a V1 RS
// Only V2 protocol has the group id in repl server start message
@@ -347,59 +347,38 @@
}
/**
- * Send the ReplServerStartDSMsg to the remote ECL server.
- * @param requestedProtocolVersion The provided protocol version.
+ * Sends a start message to the remote ECL server.
+ *
* @return The StartMsg sent.
- * @throws IOException When an exception occurs.
+ * @throws IOException
+ * When an exception occurs.
*/
- private StartMsg sendStartToRemote(short requestedProtocolVersion)
- throws IOException
+ private StartMsg sendStartToRemote() throws IOException
{
+ final StartMsg startMsg;
+
// Before V4 protocol, we sent a ReplServerStartMsg
- if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
-
// Peer DS uses protocol < V4 : send it a ReplServerStartMsg
- ReplServerStartMsg outReplServerStartMsg
- = new ReplServerStartMsg(
- replicationServerId,
- replicationServerURL,
- getServiceId(),
- maxRcvWindow,
+ startMsg = new ReplServerStartMsg(replicationServerId,
+ replicationServerURL, getServiceId(), maxRcvWindow,
replicationServerDomain.getDbServerState(),
- protocolVersion,
- localGenerationId,
- sslEncryption,
- getLocalGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold());
-
- session.publish(outReplServerStartMsg, requestedProtocolVersion);
-
- return outReplServerStartMsg;
+ localGenerationId, sslEncryption, getLocalGroupId(),
+ replicationServerDomain.getReplicationServer()
+ .getDegradedStatusThreshold());
}
else
{
// Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
- ReplServerStartDSMsg outReplServerStartDSMsg
- = new ReplServerStartDSMsg(
- replicationServerId,
- replicationServerURL,
- getServiceId(),
- maxRcvWindow,
- new ServerState(),
- protocolVersion,
- localGenerationId,
- sslEncryption,
- getLocalGroupId(),
- 0,
- replicationServer.getWeight(),
- 0);
-
-
- session.publish(outReplServerStartDSMsg);
- return outReplServerStartDSMsg;
+ startMsg = new ReplServerStartDSMsg(replicationServerId,
+ replicationServerURL, getServiceId(), maxRcvWindow,
+ new ServerState(), localGenerationId, sslEncryption,
+ getLocalGroupId(), 0, replicationServer.getWeight(), 0);
}
+
+ send(startMsg);
+ return startMsg;
}
/**
@@ -476,14 +455,15 @@
processStartFromRemote(inECLStartMsg);
// lock with timeout
- if (this.replicationServerDomain != null)
+ if (replicationServerDomain != null)
+ {
lockDomain(true);
+ }
- this.localGenerationId = -1;
+ localGenerationId = -1;
// send start to remote
- StartMsg outStartMsg =
- sendStartToRemote(protocolVersion);
+ StartMsg outStartMsg = sendStartToRemote();
// log
logStartHandshakeRCVandSND(inECLStartMsg, outStartMsg);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 5388c55..c718865 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -64,7 +64,6 @@
private ProtocolSession session;
private ECLServerHandler handler;
private ReplicationServerDomain replicationServerDomain;
- private short protocolVersion = -1;
private boolean suspended;
private boolean shutdown;
private PersistentSearch mypsearch;
@@ -88,8 +87,6 @@
this.session = session;
this.handler = handler;
this.replicationServerDomain = replicationServerDomain;
- // Keep protocol version locally for efficiency
- this.protocolVersion = handler.getProtocolVersion();
this.suspended = false;
this.shutdown = false;
@@ -249,7 +246,7 @@
// Done is used to end phase 1
session.publish(new DoneMsg(
handler.getReplicationServerId(),
- handler.getServerId()), protocolVersion);
+ handler.getServerId()));
}
}
@@ -295,7 +292,7 @@
if (session!=null)
{
- session.publish(msg, protocolVersion);
+ session.publish(msg);
}
else
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
index ae96d4e..9470d9a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
@@ -322,15 +322,6 @@
* {@inheritDoc}
*/
@Override
- public byte[] getBytes() throws UnsupportedEncodingException
- {
- return getBytes(ProtocolVersion.getCurrentVersion());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public AssuredMode getAssuredMode()
{
return realUpdateMsg.getAssuredMode();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 629da1c..3c23700 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -330,21 +330,18 @@
if (msg instanceof ServerStartMsg)
{
- session.setProtocolVersion(((StartMsg)msg).getVersion());
DataServerHandler handler = new DataServerHandler(session,
queueSize,serverURL,serverId,this,rcvWindow);
handler.startFromRemoteDS((ServerStartMsg)msg);
}
else if (msg instanceof ReplServerStartMsg)
{
- session.setProtocolVersion(((StartMsg)msg).getVersion());
ReplicationServerHandler handler = new ReplicationServerHandler(
session,queueSize,serverURL,serverId,this,rcvWindow);
handler.startFromRemoteRS((ReplServerStartMsg)msg);
}
else if (msg instanceof ServerStartECLMsg)
{
- session.setProtocolVersion(((StartMsg)msg).getVersion());
ECLServerHandler handler = new ECLServerHandler(
session,queueSize,serverURL,serverId,this,rcvWindow);
handler.startFromRemoteServer((ServerStartECLMsg)msg);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4969607..a6ebfc1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -617,8 +617,7 @@
if (preparedAssuredInfo.expectedServers == null)
{
// No eligible servers found, send the ack immediately
- AckMsg ack = new AckMsg(cn);
- sourceHandler.sendAck(ack);
+ sourceHandler.send(new AckMsg(cn));
}
return preparedAssuredInfo;
@@ -672,8 +671,7 @@
* mode with safe data level 1, coming from a DS. No need to wait
* for more acks
*/
- AckMsg ack = new AckMsg(cn);
- sourceHandler.sendAck(ack);
+ sourceHandler.send(new AckMsg(cn));
} else
{
/**
@@ -700,8 +698,7 @@
} else
{
// level > 1, so Ack this message to originator RS
- AckMsg ack = new AckMsg(cn);
- sourceHandler.sendAck(ack);
+ sourceHandler.send(new AckMsg(cn));
}
}
}
@@ -753,8 +750,7 @@
{
// level > 1 and source is a DS but no eligible servers found, send the
// ack immediately
- AckMsg ack = new AckMsg(cn);
- sourceHandler.sendAck(ack);
+ sourceHandler.send(new AckMsg(cn));
}
}
@@ -798,7 +794,7 @@
ServerHandler origServer = expectedAcksInfo.getRequesterServer();
try
{
- origServer.sendAck(finalAck);
+ origServer.send(finalAck);
} catch (IOException e)
{
/**
@@ -877,7 +873,7 @@
Integer.toString(origServer.getServerId()));
try
{
- origServer.sendAck(finalAck);
+ origServer.send(finalAck);
} catch (IOException e)
{
/**
@@ -2489,7 +2485,7 @@
getReplicationServer().getServerId(),
handler.getServerId(),
message);
- handler.sendError(errorMsg);
+ handler.send(errorMsg);
}
/*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 60c1c75..0291ac3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -30,6 +30,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
import java.io.IOException;
import java.util.ArrayList;
@@ -76,8 +77,8 @@
{
try
{
- protocolVersion = ProtocolVersion.minWithCurrent(
- inReplServerStartMsg.getVersion());
+ short protocolVersion = getCompatibleVersion(inReplServerStartMsg
+ .getVersion());
session.setProtocolVersion(protocolVersion);
generationId = inReplServerStartMsg.getGenerationId();
serverId = inReplServerStartMsg.getServerId();
@@ -107,30 +108,21 @@
}
/**
- * Send the ReplServerStartMsg to the remote RS.
- * @param requestedProtocolVersion The provided protocol version.
+ * Sends a start message to the remote RS.
+ *
* @return The ReplServerStartMsg sent.
- * @throws IOException When an exception occurs.
+ * @throws IOException
+ * When an exception occurs.
*/
- private ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
- throws IOException
+ private ReplServerStartMsg sendStartToRemote() throws IOException
{
- ReplServerStartMsg outReplServerStartMsg
- = new ReplServerStartMsg(
- replicationServerId,
- replicationServerURL,
- getServiceId(),
- maxRcvWindow,
- replicationServerDomain.getDbServerState(),
- protocolVersion,
- localGenerationId,
- sslEncryption,
- getLocalGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold());
-
- session.publish(outReplServerStartMsg, requestedProtocolVersion);
-
+ ReplServerStartMsg outReplServerStartMsg = new ReplServerStartMsg(
+ replicationServerId, replicationServerURL, getServiceId(),
+ maxRcvWindow, replicationServerDomain.getDbServerState(),
+ localGenerationId, sslEncryption,
+ getLocalGroupId(), replicationServerDomain.getReplicationServer()
+ .getDegradedStatusThreshold());
+ send(outReplServerStartMsg);
return outReplServerStartMsg;
}
@@ -178,8 +170,7 @@
lockDomain(false); // no timeout
// Send start
- ReplServerStartMsg outReplServerStartMsg =
- sendStartToRemote(ProtocolVersion.getCurrentVersion());
+ ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
// Wait answer
ReplicationMsg msg = session.receive();
@@ -203,7 +194,7 @@
}
}
- // Process hello from remote
+ // Process hello from remote.
processStartFromRemote((ReplServerStartMsg)msg);
// Duplicate server ?
@@ -233,7 +224,7 @@
if (!this.sslEncryption)
session.stopEncryption();
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
/*
Only protocol version above V1 has a phase 2 handshake
@@ -242,7 +233,9 @@
Send our own TopologyMsg to remote RS
*/
- TopologyMsg outTopoMsg = sendTopoToRemoteRS();
+ TopologyMsg outTopoMsg =
+ replicationServerDomain.createTopologyMsgForRS();
+ sendTopoInfo(outTopoMsg);
// wait and process Topo from remote RS
TopologyMsg inTopoMsg = waitAndProcessTopoFromRemoteRS();
@@ -341,8 +334,7 @@
}
this.localGenerationId = replicationServerDomain.getGenerationId();
- ReplServerStartMsg outReplServerStartMsg =
- sendStartToRemote(protocolVersion);
+ ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
// log
logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
@@ -355,7 +347,7 @@
session.stopEncryption();
TopologyMsg inTopoMsg = null;
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
/*
Only protocol version above V1 has a phase 2 handshake
@@ -372,7 +364,9 @@
}
// send our own TopologyMsg to remote RS
- TopologyMsg outTopoMsg = sendTopoToRemoteRS();
+ TopologyMsg outTopoMsg = replicationServerDomain
+ .createTopologyMsgForRS();
+ sendTopoInfo(outTopoMsg);
// log
logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg);
@@ -476,18 +470,6 @@
}
/**
- * Create and send the topologyMsg to the remote replication server.
- * @return the topologyMsg sent.
- */
- private TopologyMsg sendTopoToRemoteRS()
- throws IOException
- {
- TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
- session.publish(outTopoMsg, protocolVersion);
- return outTopoMsg;
- }
-
- /**
* Wait receiving the TopologyMsg from the remote RS and process it.
* @return the topologyMsg received or {@code null} if stop was received.
* @throws DirectoryException
@@ -528,12 +510,13 @@
/* Store remote RS weight if it has one.
* For protocol version < 4, use default value of 1 for weight
*/
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// List should only contain RS info for sender
RSInfo rsInfo = inTopoMsg.getRsList().get(0);
weight = rsInfo.getWeight();
}
+
/*
if the remote RS and the local RS have the same genID
then it's ok and nothing else to do
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index e6915fd..7be892c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -49,12 +49,7 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
-import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.EntryMsg;
-import org.opends.server.replication.protocol.InitializeRequestMsg;
-import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
-import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -193,11 +188,6 @@
* The initial size of the sending window.
*/
int sendWindowSize;
-
- /**
- * The protocol version established with the remote server.
- */
- protected short protocolVersion = -1;
/**
* remote generation id.
*/
@@ -267,7 +257,6 @@
super(queueSize, replicationServerURL,
replicationServerId, replicationServer);
this.session = session;
- this.protocolVersion = ProtocolVersion.getCurrentVersion();
this.rcvWindowSizeHalf = rcvWindowSize / 2;
this.maxRcvWindow = rcvWindowSize;
this.rcvWindow = rcvWindowSize;
@@ -399,13 +388,24 @@
/**
* Sends a message.
- * @param msg The message to be sent.
- * @throws IOException When it occurs while sending the message,
*
+ * @param msg
+ * The message to be sent.
+ * @throws IOException
+ * When it occurs while sending the message,
*/
- public void send(ReplicationMsg msg)
- throws IOException
+ public void send(ReplicationMsg msg) throws IOException
{
+ /*
+ * Some unit tests include a null domain, so avoid logging anything in that
+ * case.
+ */
+ if (debugEnabled() && replicationServerDomain != null)
+ {
+ TRACER.debugInfo("In "
+ + replicationServerDomain.getReplicationServer()
+ .getMonitorInstanceName() + this + " publishes message:\n" + msg);
+ }
session.publish(msg);
}
@@ -653,7 +653,7 @@
*/
public short getProtocolVersion()
{
- return protocolVersion;
+ return session.getProtocolVersion();
}
/**
@@ -950,68 +950,19 @@
}
/**
- * Send an InitializeRequestMessage to the server connected through this
- * handler.
- *
- * @param msg The message to be processed
- * @throws IOException when raised by the underlying session
- */
- public void send(RoutableMsg msg) throws IOException
- {
- if (debugEnabled())
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + this +
- " publishes message:\n" + msg);
-
- // Currently only MonitorMsg has to support a backward compatibility
- if ((msg instanceof MonitorMsg) || (msg instanceof ErrorMsg) ||
- (msg instanceof EntryMsg) || (msg instanceof InitializeRequestMsg) ||
- (msg instanceof InitializeTargetMsg))
- {
- session.publish(msg, protocolVersion);
- } else
- {
- session.publish(msg);
- }
- }
-
- /**
- * Sends an ack message to the server represented by this object.
- *
- * @param ack The ack message to be sent.
- * @throws IOException In case of Exception thrown sending the ack.
- */
- public void sendAck(AckMsg ack) throws IOException
- {
- session.publish(ack);
- }
-
- /**
- * Send an ErrorMsg to the peer.
- *
- * @param errorMsg The message to be sent
- * @throws IOException when raised by the underlying session
- */
- public void sendError(ErrorMsg errorMsg) throws IOException
- {
- session.publish(errorMsg);
- }
-
- /**
* Sends the provided TopologyMsg to the peer server.
*
- * @param topoMsg The TopologyMsg message to be sent.
- * @throws IOException When it occurs while sending the message,
- *
+ * @param topoMsg
+ * The TopologyMsg message to be sent.
+ * @throws IOException
+ * When it occurs while sending the message,
*/
- public void sendTopoInfo(TopologyMsg topoMsg)
- throws IOException
+ public void sendTopoInfo(TopologyMsg topoMsg) throws IOException
{
// V1 Rs do not support the TopologyMsg
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
- session.publish(topoMsg, protocolVersion);
+ send(topoMsg);
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 402d6e4..4aae7d7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
import org.opends.messages.Message;
@@ -58,7 +58,6 @@
private final ProtocolSession session;
private final ServerHandler handler;
private final ReplicationServerDomain replicationServerDomain;
- private final short protocolVersion;
@@ -88,8 +87,6 @@
this.session = session;
this.handler = handler;
this.replicationServerDomain = replicationServerDomain;
- // Keep protocol version locally for efficiency
- this.protocolVersion = handler.getProtocolVersion();
}
/**
@@ -194,7 +191,7 @@
// Publish the update to the remote server using a protocol version he
// it supports
- session.publish(update, protocolVersion);
+ session.publish(update);
}
}
catch (NoSuchElementException e)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index be2db38..7ba4c8e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,6 +31,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.server.ReplicationServer.*;
import static org.opends.server.util.StaticUtils.*;
@@ -1259,14 +1260,12 @@
{
serverStartMsg = new ServerStartMsg(serverId, url, baseDn,
maxRcvWindow, heartbeatInterval, state,
- ProtocolVersion.getCurrentVersion(),
this.getGenerationID(), isSslEncryption, groupId);
}
else
{
serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
maxRcvWindow, heartbeatInterval, state,
- ProtocolVersion.getCurrentVersion(),
this.getGenerationID(), isSslEncryption, groupId);
}
localSession.publish(serverStartMsg);
@@ -1299,8 +1298,8 @@
* replication server will use the same one (or an older one if it is an
* old replication server).
*/
- final short localProtocolVersion = ProtocolVersion
- .minWithCurrent(replServerInfo.getProtocolVersion());
+ final short localProtocolVersion = getCompatibleVersion(replServerInfo
+ .getProtocolVersion());
if (keepConnection)
{
protocolVersion = localProtocolVersion;
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 463a644..74b8119 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -473,7 +473,7 @@
String serverURL = ("localhost:" + port);
ReplServerStartMsg replServerStartMsg = new ReplServerStartMsg(serverId,
serverURL, baseDn, windowSize, serverState,
- ProtocolVersion.getCurrentVersion(), generationId, sslEncryption,
+ generationId, sslEncryption,
groupId, degradedStatusThreshold);
session.publish(replServerStartMsg);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
index 68768e4..72eecaa 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -108,7 +108,7 @@
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -158,7 +158,7 @@
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -210,7 +210,7 @@
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -262,7 +262,7 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -315,7 +315,7 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -328,7 +328,7 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, WINNER, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -383,7 +383,7 @@
// This server has less changes than the other one but it has the same
// group id as us so he should be the winner
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -396,7 +396,7 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(12, LOOSER1, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, LOOSER1, null, 0, aState, 0L,
false, (byte)2, 0);
rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -449,7 +449,7 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, 0L,
false, (byte)2, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -462,7 +462,7 @@
cn = new ChangeNumber(2L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, WINNER, null, 0, aState, 0L,
false, (byte)2, 0);
rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -516,7 +516,7 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -529,7 +529,7 @@
cn = new ChangeNumber(4L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(12, LOOSER2, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, LOOSER2, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -542,7 +542,7 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(13, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(13, WINNER, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -596,7 +596,7 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -609,7 +609,7 @@
cn = new ChangeNumber(3L, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(12, LOOSER2, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, LOOSER2, null, 0, aState, 0L,
false, (byte)2, 0);
rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -624,7 +624,7 @@
// This server has less changes than looser2 but it has the same
// group id as us so he should be the winner
replServerStartMsg =
- new ReplServerStartMsg(13, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(13, WINNER, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -676,7 +676,7 @@
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -766,7 +766,7 @@
cn = new ChangeNumber(looser1T3, 0, myId3);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
if (looser1IsLocal)
@@ -781,7 +781,7 @@
cn = new ChangeNumber(winnerT3, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(12, WINNER, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
if (winnerIsLocal)
@@ -796,7 +796,7 @@
cn = new ChangeNumber(looser2T3, 0, myId3);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(13, LOOSER2, null, 0, aState, (short)0, 0L,
+ new ReplServerStartMsg(13, LOOSER2, null, 0, aState, 0L,
false, (byte)1, 0);
rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
if (looser2IsLocal)
@@ -867,7 +867,7 @@
cn = new ChangeNumber(looser1T1, 0, myId1);
aState.update(cn);
ReplServerStartMsg replServerStartMsg =
- new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, looser1GenId,
+ new ReplServerStartMsg(11, LOOSER1, null, 0, aState, looser1GenId,
false, looser1GroupId, 0);
rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
if (looser1IsLocal)
@@ -878,7 +878,7 @@
cn = new ChangeNumber(winnerT1, 0, myId1);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, winnerGenId,
+ new ReplServerStartMsg(12, WINNER, null, 0, aState, winnerGenId,
false, winnerGroupId, 0);
rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
if (winnerIsLocal)
@@ -889,7 +889,7 @@
cn = new ChangeNumber(looser2T1, 0, myId1);
aState.update(cn);
replServerStartMsg =
- new ReplServerStartMsg(13, LOOSER2, null, 0, aState, (short)0, looser2GenId,
+ new ReplServerStartMsg(13, LOOSER2, null, 0, aState, looser2GenId,
false, looser2GroupId, 0);
rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
if (looser2IsLocal)
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
index 0580d7a..35a9e88 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -65,6 +65,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
+import static org.opends.server.replication.protocol.ProtocolVersion.getCurrentVersion;
import static org.opends.server.util.StaticUtils.byteToHex;
import static org.opends.messages.ReplicationMessages.*;
@@ -134,12 +135,9 @@
public void replServerStartMsgTestVLASTV1(int serverId, String baseDN, int window,
String url, ServerState state, long genId, byte groupId, int degTh) throws Exception
{
- // Create VLAST message
+ // Create message with no version.
ReplServerStartMsg msg = new ReplServerStartMsg(serverId,
- url, baseDN, window, state, ProtocolVersion.getCurrentVersion(), genId, true, groupId, degTh);
-
- // Check version of message
- assertEquals(msg.getVersion(), REPLICATION_PROTOCOL_VLAST);
+ url, baseDN, window, state, genId, true, groupId, degTh);
// Serialize in V1
byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -169,7 +167,7 @@
newMsg.setDegradedStatusThreshold(degTh);
// Serialize in VLAST msg
- ReplServerStartMsg vlastMsg = new ReplServerStartMsg(newMsg.getBytes());
+ ReplServerStartMsg vlastMsg = new ReplServerStartMsg(newMsg.getBytes(getCurrentVersion()));
// Check original version of message
assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -962,7 +960,7 @@
assertEquals(msg.getServerId(), serverId);
assertEquals(msg.getBaseDn(), dn);
assertEquals(msg.getGroupId(), groupId);
- BigInteger bi = new BigInteger(msg.getBytes());
+ BigInteger bi = new BigInteger(msg.getBytes(getCurrentVersion()));
assertEquals(bi.toString(16), oldPdu);
}
@@ -1187,7 +1185,7 @@
newMsg.setMsgId(msgId);
// Serialize in VLAST
- EntryMsg vlastMsg = new EntryMsg(newMsg.getBytes(),REPLICATION_PROTOCOL_VLAST);
+ EntryMsg vlastMsg = new EntryMsg(newMsg.getBytes(getCurrentVersion()),REPLICATION_PROTOCOL_VLAST);
// Check we retrieve original VLAST message (VLAST fields)
assertEquals(msg.getSenderID(), vlastMsg.getSenderID());
@@ -1233,7 +1231,7 @@
newMsg.setCreationTime(creatTime);
// Serialize in VLAST
- ErrorMsg vlastMsg = new ErrorMsg(newMsg.getBytes(),
+ ErrorMsg vlastMsg = new ErrorMsg(newMsg.getBytes(getCurrentVersion()),
REPLICATION_PROTOCOL_VLAST);
// Check we retrieve original VLAST message (VLAST fields)
@@ -1284,8 +1282,8 @@
newMsg.setInitWindow(initWindow);
// Serialize in VLAST
- InitializeRequestMsg vlastMsg = new InitializeRequestMsg(newMsg.getBytes(),
- REPLICATION_PROTOCOL_VLAST);
+ InitializeRequestMsg vlastMsg = new InitializeRequestMsg(
+ newMsg.getBytes(getCurrentVersion()), REPLICATION_PROTOCOL_VLAST);
// Check we retrieve original VLAST message (VLAST fields)
assertEquals(msg.getSenderID(), vlastMsg.getSenderID());
@@ -1341,8 +1339,8 @@
newMsg.setInitWindow(initWindow);
// Serialize in VLAST
- InitializeTargetMsg vlastMsg = new InitializeTargetMsg(newMsg.getBytes(),
- REPLICATION_PROTOCOL_VLAST);
+ InitializeTargetMsg vlastMsg = new InitializeTargetMsg(
+ newMsg.getBytes(getCurrentVersion()), REPLICATION_PROTOCOL_VLAST);
// Check we retrieve original VLAST message (VLAST fields)
assertEquals(msg.getSenderID(), vlastMsg.getSenderID());
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 4d5e766..5a91ec6 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -29,6 +29,7 @@
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
+import static org.opends.server.replication.protocol.ProtocolVersion.getCurrentVersion;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -735,7 +736,7 @@
assertEquals(msg1.getFailedServers(), failedServers);
// Constructor test (with byte[])
- msg2 = new AckMsg(msg1.getBytes());
+ msg2 = new AckMsg(msg1.getBytes(getCurrentVersion()));
assertEquals(msg2.getChangeNumber().compareTo(cn), 0);
assertTrue(msg1.hasTimeout() == msg2.hasTimeout());
assertTrue(msg1.hasWrongStatus() == msg2.hasWrongStatus());
@@ -743,7 +744,7 @@
assertEquals(msg1.getFailedServers(), msg2.getFailedServers());
// Check invalid bytes for constructor
- byte[] b = msg1.getBytes();
+ byte[] b = msg1.getBytes(getCurrentVersion());
b[0] = ReplicationMsg.MSG_TYPE_ADD;
try
{
@@ -758,7 +759,7 @@
// Check that retrieved CN is OK
msg2 = (AckMsg) ReplicationMsg.generateMsg(
- msg1.getBytes(), ProtocolVersion.getCurrentVersion());
+ msg1.getBytes(getCurrentVersion()), getCurrentVersion());
}
@Test(enabled=true)
@@ -793,7 +794,7 @@
assertTrue(delmsg.compareTo(delmsg2)==0);
// Constructor test (with byte[])
- ECLUpdateMsg msg2 = new ECLUpdateMsg(msg1.getBytes());
+ ECLUpdateMsg msg2 = new ECLUpdateMsg(msg1.getBytes(getCurrentVersion()));
assertTrue(msg2.getCookie().equalsTo(msg2.getCookie()));
assertTrue(msg2.getCookie().equalsTo(cookie));
assertTrue(msg2.getServiceId().equalsIgnoreCase(msg1.getServiceId()));
@@ -836,8 +837,8 @@
{
ServerStartMsg msg = new ServerStartMsg(
serverId, "localhost:1234", baseDN, window, window, state,
- ProtocolVersion.getCurrentVersion(), genId, sslEncryption, groupId);
- ServerStartMsg newMsg = new ServerStartMsg(msg.getBytes());
+ genId, sslEncryption, groupId);
+ ServerStartMsg newMsg = new ServerStartMsg(msg.getBytes(getCurrentVersion()));
assertEquals(msg.getServerId(), newMsg.getServerId());
assertEquals(msg.getServerURL(), newMsg.getServerURL());
assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -846,7 +847,7 @@
assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
assertEquals(msg.getServerState().getMaxChangeNumber(1),
newMsg.getServerState().getMaxChangeNumber(1));
- assertEquals(msg.getVersion(), newMsg.getVersion());
+ assertEquals(newMsg.getVersion(), getCurrentVersion());
assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
assertTrue(msg.getGroupId() == newMsg.getGroupId());
}
@@ -879,16 +880,16 @@
String url, ServerState state, long genId, byte groupId, int degTh) throws Exception
{
ReplServerStartMsg msg = new ReplServerStartMsg(serverId,
- url, baseDN, window, state, ProtocolVersion.getCurrentVersion(), genId,
+ url, baseDN, window, state, genId,
true, groupId, degTh);
- ReplServerStartMsg newMsg = new ReplServerStartMsg(msg.getBytes());
+ ReplServerStartMsg newMsg = new ReplServerStartMsg(msg.getBytes(getCurrentVersion()));
assertEquals(msg.getServerId(), newMsg.getServerId());
assertEquals(msg.getServerURL(), newMsg.getServerURL());
assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
assertEquals(msg.getServerState().getMaxChangeNumber(1),
newMsg.getServerState().getMaxChangeNumber(1));
- assertEquals(msg.getVersion(), newMsg.getVersion());
+ assertEquals(newMsg.getVersion(), getCurrentVersion());
assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
assertTrue(msg.getGroupId() == newMsg.getGroupId());
@@ -925,16 +926,16 @@
int weight, int connectedDSNumber) throws Exception
{
ReplServerStartDSMsg msg = new ReplServerStartDSMsg(serverId,
- url, baseDN, window, state, ProtocolVersion.getCurrentVersion(), genId,
+ url, baseDN, window, state, genId,
true, groupId, degTh, weight, connectedDSNumber);
- ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes());
+ ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes(getCurrentVersion()));
assertEquals(msg.getServerId(), newMsg.getServerId());
assertEquals(msg.getServerURL(), newMsg.getServerURL());
assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
assertEquals(msg.getServerState().getMaxChangeNumber(1),
newMsg.getServerState().getMaxChangeNumber(1));
- assertEquals(msg.getVersion(), newMsg.getVersion());
+ assertEquals(newMsg.getVersion(), getCurrentVersion());
assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
assertTrue(msg.getGroupId() == newMsg.getGroupId());
@@ -952,7 +953,7 @@
public void stopMsgTest() throws Exception
{
StopMsg msg = new StopMsg();
- StopMsg newMsg = new StopMsg(msg.getBytes());
+ StopMsg newMsg = new StopMsg(msg.getBytes(getCurrentVersion()));
}
/**
@@ -963,7 +964,7 @@
public void windowMsgTest() throws Exception
{
WindowMsg msg = new WindowMsg(123);
- WindowMsg newMsg = new WindowMsg(msg.getBytes());
+ WindowMsg newMsg = new WindowMsg(msg.getBytes(getCurrentVersion()));
assertEquals(msg.getNumAck(), newMsg.getNumAck());
}
@@ -976,7 +977,7 @@
public void windowProbeMsgTest() throws Exception
{
WindowProbeMsg msg = new WindowProbeMsg();
- new WindowProbeMsg(msg.getBytes());
+ new WindowProbeMsg(msg.getBytes(getCurrentVersion()));
}
@DataProvider(name="createTopologyData")
@@ -1071,7 +1072,7 @@
throws Exception
{
TopologyMsg msg = new TopologyMsg(dsList, rsList);
- TopologyMsg newMsg = new TopologyMsg(msg.getBytes(),
+ TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()),
ProtocolVersion.getCurrentVersion());
assertEquals(msg.getDsList(), newMsg.getDsList());
assertEquals(msg.getRsList(), newMsg.getRsList());
@@ -1139,7 +1140,7 @@
assuredMode, safedataLevel);
msg.setEclIncludes(attrs, attrs);
StartSessionMsg newMsg =
- new StartSessionMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
+ new StartSessionMsg(msg.getBytes(getCurrentVersion()),getCurrentVersion());
assertEquals(msg.getStatus(), newMsg.getStatus());
assertTrue(msg.isAssured() == newMsg.isAssured());
assertEquals(msg.getAssuredMode(), newMsg.getAssuredMode());
@@ -1170,7 +1171,7 @@
throws Exception
{
ChangeStatusMsg msg = new ChangeStatusMsg(reqStatus, newStatus);
- ChangeStatusMsg newMsg = new ChangeStatusMsg(msg.getBytes());
+ ChangeStatusMsg newMsg = new ChangeStatusMsg(msg.getBytes(getCurrentVersion()));
assertEquals(msg.getRequestedStatus(), newMsg.getRequestedStatus());
assertEquals(msg.getNewStatus(), newMsg.getNewStatus());
}
@@ -1182,7 +1183,7 @@
public void heartbeatMsgTest() throws Exception
{
HeartbeatMsg msg = new HeartbeatMsg();
- HeartbeatMsg newMsg = new HeartbeatMsg(msg.getBytes());
+ HeartbeatMsg newMsg = new HeartbeatMsg(msg.getBytes(getCurrentVersion()));
assertNotNull(newMsg);
}
@@ -1193,7 +1194,7 @@
public void resetGenerationIdMsgTest() throws Exception
{
ResetGenerationIdMsg msg = new ResetGenerationIdMsg(23657);
- ResetGenerationIdMsg newMsg = new ResetGenerationIdMsg(msg.getBytes());
+ ResetGenerationIdMsg newMsg = new ResetGenerationIdMsg(msg.getBytes(getCurrentVersion()));
assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
}
@@ -1204,7 +1205,7 @@
public void monitorRequestMsgTest() throws Exception
{
MonitorRequestMsg msg = new MonitorRequestMsg(1,2);
- MonitorRequestMsg newMsg = new MonitorRequestMsg(msg.getBytes());
+ MonitorRequestMsg newMsg = new MonitorRequestMsg(msg.getBytes(getCurrentVersion()));
assertEquals(newMsg.getDestination(), 2);
assertEquals(newMsg.getSenderID(), 1);
}
@@ -1251,7 +1252,7 @@
msg.setServerState(sid2, s2, now+2, true);
msg.setServerState(sid3, s3, now+3, false);
- byte[] b = msg.getBytes();
+ byte[] b = msg.getBytes(getCurrentVersion());
MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion());
assertEquals(rsState, msg.getReplServerDbState());
@@ -1319,7 +1320,7 @@
int target = 45678;
byte[] entry = taskInitFromS2.getBytes();
EntryMsg msg = new EntryMsg(sender, target, entry, 1);
- EntryMsg newMsg = new EntryMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
+ EntryMsg newMsg = new EntryMsg(msg.getBytes(getCurrentVersion()),getCurrentVersion());
assertEquals(msg.getSenderID(), newMsg.getSenderID());
assertEquals(msg.getDestination(), newMsg.getDestination());
assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes());
@@ -1335,7 +1336,7 @@
int target = 56789;
InitializeRequestMsg msg = new InitializeRequestMsg(
TEST_ROOT_DN_STRING, sender, target, 100);
- InitializeRequestMsg newMsg = new InitializeRequestMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
+ InitializeRequestMsg newMsg = new InitializeRequestMsg(msg.getBytes(getCurrentVersion()),getCurrentVersion());
assertEquals(msg.getSenderID(), newMsg.getSenderID());
assertEquals(msg.getDestination(), newMsg.getDestination());
assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn()));
@@ -1355,7 +1356,7 @@
InitializeTargetMsg msg = new InitializeTargetMsg(
TEST_ROOT_DN_STRING, senderID, targetID, requestorID, entryCount, initWindow);
- InitializeTargetMsg newMsg = new InitializeTargetMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
+ InitializeTargetMsg newMsg = new InitializeTargetMsg(msg.getBytes(getCurrentVersion()),getCurrentVersion());
assertEquals(msg.getSenderID(), newMsg.getSenderID());
assertEquals(msg.getDestination(), newMsg.getDestination());
assertEquals(msg.getInitiatorID(), newMsg.getInitiatorID());
@@ -1377,7 +1378,7 @@
public void doneMsgTest() throws Exception
{
DoneMsg msg = new DoneMsg(1, 2);
- DoneMsg newMsg = new DoneMsg(msg.getBytes());
+ DoneMsg newMsg = new DoneMsg(msg.getBytes(getCurrentVersion()));
assertEquals(msg.getSenderID(), newMsg.getSenderID());
assertEquals(msg.getDestination(), newMsg.getDestination());
}
@@ -1389,7 +1390,7 @@
public void errorMsgTest() throws Exception
{
ErrorMsg msg = new ErrorMsg(1, 2, Message.raw("details"));
- ErrorMsg newMsg = new ErrorMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
+ ErrorMsg newMsg = new ErrorMsg(msg.getBytes(getCurrentVersion()),getCurrentVersion());
assertEquals(msg.getSenderID(), newMsg.getSenderID());
assertEquals(msg.getDestination(), newMsg.getDestination());
assertEquals(msg.getMsgID(), newMsg.getMsgID());
@@ -1421,8 +1422,8 @@
{
ServerStartECLMsg msg = new ServerStartECLMsg(
"localhost:1234", window, window, window, window, window, window, state,
- ProtocolVersion.getCurrentVersion(), genId, sslEncryption, groupId);
- ServerStartECLMsg newMsg = new ServerStartECLMsg(msg.getBytes());
+ genId, sslEncryption, groupId);
+ ServerStartECLMsg newMsg = new ServerStartECLMsg(msg.getBytes(getCurrentVersion()));
assertEquals(msg.getServerURL(), newMsg.getServerURL());
assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay());
assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue());
@@ -1433,7 +1434,7 @@
assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
assertEquals(msg.getServerState().getMaxChangeNumber(1),
newMsg.getServerState().getMaxChangeNumber(1));
- assertEquals(msg.getVersion(), newMsg.getVersion());
+ assertEquals(newMsg.getVersion(), getCurrentVersion());
assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
assertTrue(msg.getGroupId() == newMsg.getGroupId());
}
@@ -1469,7 +1470,7 @@
dns.add(dn2);
msg.setExcludedDNs(dns);
// create copy
- StartECLSessionMsg newMsg = new StartECLSessionMsg(msg.getBytes());
+ StartECLSessionMsg newMsg = new StartECLSessionMsg(msg.getBytes(getCurrentVersion()));
// test equality between the two copies
assertEquals(msg.getChangeNumber(), newMsg.getChangeNumber());
assertTrue(msg.isPersistent() == newMsg.isPersistent());
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 9821ce8..111be7a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -918,7 +918,7 @@
// Send our repl server start msg
ReplServerStartMsg replServerStartMsg = new ReplServerStartMsg(serverId,
fakeUrl, baseDn, 100, serverState,
- ProtocolVersion.getCurrentVersion(), generationId, sslEncryption,
+ generationId, sslEncryption,
groupId, 5000);
session.publish(replServerStartMsg);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index a17a8f1..191351b 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -990,7 +990,7 @@
ServerStartMsg msg =
new ServerStartMsg( 1723, url, TEST_ROOT_DN_STRING,
WINDOW, 5000, new ServerState(),
- ProtocolVersion.getCurrentVersion(), 0, sslEncryption, (byte)-1);
+ 0, sslEncryption, (byte)-1);
session.publish(msg);
// Read the Replication Server state from the ReplServerStartDSMsg that
--
Gitblit v1.10.0