From 99480fcbcb68be6a357f6218668feab697e1a93d Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 07 Jul 2009 14:55:26 +0000
Subject: [PATCH] Fix for 4096 MonitorMsg is not compatible with replication version
---
opends/src/server/org/opends/server/replication/protocol/SocketSession.java | 11 +
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 19 -
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java | 3
opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java | 3
opends/src/server/org/opends/server/replication/protocol/StartMsg.java | 16 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java | 28 ++-
opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java | 8 +
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java | 21 --
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java | 11 +
opends/src/server/org/opends/server/replication/server/ReplicationData.java | 6
opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 10
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 4
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java | 15 -
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java | 116 ++++++++++++--
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java | 4
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 21 +-
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 5
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java | 11 +
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 6
opends/src/server/org/opends/server/backends/SchemaBackend.java | 9
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 34 +++
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java | 113 ++++++--------
23 files changed, 297 insertions(+), 180 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/SchemaBackend.java b/opends/src/server/org/opends/server/backends/SchemaBackend.java
index 429c36d..056da63 100644
--- a/opends/src/server/org/opends/server/backends/SchemaBackend.java
+++ b/opends/src/server/org/opends/server/backends/SchemaBackend.java
@@ -122,6 +122,7 @@
private static final String CONFIG_SCHEMA_ELEMENTS_FILE = "02-config.ldif";
+ private static final String CORE_SCHEMA_ELEMENTS_FILE = "00-core.ldif";
@@ -4325,11 +4326,13 @@
{
String schemaFile = removeType.getSchemaFile();
if ((schemaFile != null) &&
- (schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)))
+ ((schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)) ||
+ (schemaFile.equals(CORE_SCHEMA_ELEMENTS_FILE))) )
{
- // Don't import the file containing the definitiong of the
+ // Don't import the file containing the definitions of the
// Schema elements used for configuration because these
// definitions may vary between versions of OpenDS.
+ // Also never delete anything from the core schema file.
continue;
}
if (!oidList.contains(removeType.getOID()))
@@ -4447,7 +4450,7 @@
if ((schemaFile != null) &&
(schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)))
{
- // Don't import the file containing the definitiong of the
+ // Don't import the file containing the definition of the
// Schema elements used for configuration because these
// definitions may vary between versions of OpenDS.
continue;
diff --git a/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
index 0e8e8b5..590d83d 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -98,7 +98,9 @@
length = in.length - pos - 1;
byte[] encodedMsg = new byte[length];
System.arraycopy(in, pos, encodedMsg, 0, length);
- ReplicationMsg rmsg = ReplicationMsg.generateMsg(encodedMsg);
+ ReplicationMsg rmsg =
+ ReplicationMsg.generateMsg(
+ encodedMsg, ProtocolVersion.getCurrentVersion());
this.updateMsg = (LDAPUpdateMsg)rmsg;
}
catch (UnsupportedEncodingException e)
diff --git a/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
index 66624aa..3927cfd 100644
--- a/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -276,25 +276,10 @@
public byte[] getBytes(short reqProtocolVersion)
throws UnsupportedEncodingException
{
-
- // Using current protocol version should normally not be done as we would
- // normally call the getBytes() method instead for that. So this check
- // for security
- if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
- {
+ if (reqProtocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ return getBytes_V1();
+ else
return getBytes();
- }
-
- switch (reqProtocolVersion)
- {
- case ProtocolVersion.REPLICATION_PROTOCOL_V1:
- return getBytes_V1();
- default:
- // Unsupported requested version
- throw new UnsupportedEncodingException(getClass().getSimpleName() +
- " PDU does not support requested protocol version serialization: " +
- reqProtocolVersion);
- }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index 94161aa..a70b5b2 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -26,6 +26,7 @@
*/
package org.opends.server.replication.protocol;
+import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -81,7 +82,12 @@
SubTopoMonitorData data = new SubTopoMonitorData();
/**
- * Creates a new EntryMessage.
+ * The protocolVersion that should be used when serializing this message.
+ */
+ private final short protocolVersion;
+
+ /**
+ * Creates a new MonitorMsg.
*
* @param sender The sender of this message.
* @param destination The destination of this message.
@@ -89,8 +95,25 @@
public MonitorMsg(short sender, short destination)
{
super(sender, destination);
+ protocolVersion = ProtocolVersion.getCurrentVersion();
}
+
+ /**
+ * Creates a new MonitorMsg with a specific protocol version.
+ *
+ * @param sender The sender of this message.
+ * @param destination The destination of this message.
+ * @param replicationProtocol The protocol version to use.
+ */
+ public MonitorMsg(short sender, short destination,
+ short replicationProtocol)
+ {
+ super(sender, destination);
+ protocolVersion = replicationProtocol;
+ }
+
+
/**
* Sets the state of the replication server.
* @param state The state.
@@ -174,24 +197,58 @@
/**
* Creates a new EntryMessage from its encoded form.
*
- * @param in The byte array containing the encoded form of the message.
+ * @param in The byte array containing the encoded form of the message.
+ * @param version The version of the protocol to use to decode the msg.
* @throws DataFormatException If the byte array does not contain a valid
* encoded form of the ServerStartMessage.
*/
- public MonitorMsg(byte[] in) throws DataFormatException
+ public MonitorMsg(byte[] in, short version) throws DataFormatException
{
+ protocolVersion = ProtocolVersion.getCurrentVersion();
ByteSequenceReader reader = ByteString.wrap(in).asReader();
- /* first byte is the type */
- if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
- throw new DataFormatException("input is not a valid " +
- this.getClass().getCanonicalName());
+ if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ {
+ try
+ {
+ /* first byte is the type */
+ if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
+ throw new DataFormatException("input is not a valid " +
+ this.getClass().getCanonicalName());
+ int pos = 1;
- // sender
- this.senderID = reader.getShort();
+ // sender
+ int length = getNextLength(in, pos);
+ String senderIDString = new String(in, pos, length, "UTF-8");
+ this.senderID = Short.valueOf(senderIDString);
+ pos += length +1;
- // destination
- this.destination = reader.getShort();
+ // destination
+ length = getNextLength(in, pos);
+ String destinationString = new String(in, pos, length, "UTF-8");
+ this.destination = Short.valueOf(destinationString);
+ pos += length +1;
+
+ reader.position(pos);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+ else
+ {
+ if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
+ throw new DataFormatException("input is not a valid " +
+ this.getClass().getCanonicalName());
+
+ // sender
+ this.senderID = reader.getShort();
+
+ // destination
+ this.destination = reader.getShort();
+ }
+
ASN1Reader asn1Reader = ASN1.getReader(reader);
try
@@ -262,11 +319,14 @@
ByteStringBuilder byteBuilder = new ByteStringBuilder();
ASN1Writer writer = ASN1.getWriter(byteBuilder);
- /* put the type of the operation */
- byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
+ if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ {
+ /* put the type of the operation */
+ byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
- byteBuilder.append(senderID);
- byteBuilder.append(destination);
+ byteBuilder.append(senderID);
+ byteBuilder.append(destination);
+ }
/* Put the serverStates ... */
writer.writeStartSequence();
@@ -331,7 +391,31 @@
writer.writeEndSequence();
- return byteBuilder.toByteArray();
+ if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ {
+ return byteBuilder.toByteArray();
+ }
+ else
+ {
+ byte[] temp = byteBuilder.toByteArray();
+
+ byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
+ byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+
+ int length = 1 + 1 + senderBytes.length +
+ 1 + destinationBytes.length + temp.length +1;
+ byte[] resultByteArray = new byte[length];
+
+ /* put the type of the operation */
+ resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
+ int pos = 1;
+
+ pos = addByteArray(senderBytes, resultByteArray, pos);
+ pos = addByteArray(destinationBytes, resultByteArray, pos);
+ pos = addByteArray(temp, resultByteArray, pos);
+
+ return resultByteArray;
+ }
}
catch (Exception e)
{
diff --git a/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java b/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
index 6aace54..4825445 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -156,4 +156,12 @@
* on this ProtocolSession.
*/
public abstract boolean closeInitiated();
+
+ /**
+ * This method is called at the establishment of the session and can
+ * be used to record the version of the protocol that is currently used.
+ *
+ * @param version The version of the protocol that is currently used.
+ */
+ public abstract void setProtocolVersion(short version);
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
index 0201146..7127bd5 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -223,59 +223,70 @@
*/
@Override
public byte[] getBytes()
+ throws UnsupportedEncodingException
{
+ return getBytes(ProtocolVersion.getCurrentVersion());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes(short protocolVersion)
+ throws UnsupportedEncodingException
+ {
+ if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ return getBytes_V1();
+
/* The ReplServerStartMsg is stored in the form :
* <operation type><baseDn><serverId><serverURL><windowSize><sslEncryption>
* <degradedStatusThreshold><serverState>
*/
- try {
- byte[] byteDn = baseDn.getBytes("UTF-8");
- byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
- byte[] byteServerUrl = serverURL.getBytes("UTF-8");
- byte[] byteServerState = serverState.getBytes();
- byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
- byte[] byteSSLEncryption =
- String.valueOf(sslEncryption).getBytes("UTF-8");
- byte[] byteDegradedStatusThreshold =
- String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
- int length = byteDn.length + 1 + byteServerId.length + 1 +
- byteServerUrl.length + 1 + byteWindowSize.length + 1 +
- byteSSLEncryption.length + 1 +
- byteDegradedStatusThreshold.length + 1 +
- byteServerState.length + 1;
+ byte[] byteDn = baseDn.getBytes("UTF-8");
+ byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
+ byte[] byteServerUrl = serverURL.getBytes("UTF-8");
+ byte[] byteServerState = serverState.getBytes();
+ byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
+ byte[] byteSSLEncryption =
+ String.valueOf(sslEncryption).getBytes("UTF-8");
+ byte[] byteDegradedStatusThreshold =
+ String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
- /* encode the header in a byte[] large enough to also contain the mods */
- byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length);
- int pos = headerLength;
+ int length = byteDn.length + 1 + byteServerId.length + 1 +
+ byteServerUrl.length + 1 + byteWindowSize.length + 1 +
+ byteSSLEncryption.length + 1 +
+ byteDegradedStatusThreshold.length + 1 +
+ byteServerState.length + 1;
- /* put the baseDN and a terminating 0 */
- pos = addByteArray(byteDn, resultByteArray, pos);
+ /* encode the header in a byte[] large enough to also contain the mods */
+ byte resultByteArray[] =
+ encodeHeader(MSG_TYPE_REPL_SERVER_START, length, protocolVersion);
- /* put the ServerId */
- pos = addByteArray(byteServerId, resultByteArray, pos);
+ int pos = headerLength;
- /* put the ServerURL */
- pos = addByteArray(byteServerUrl, resultByteArray, pos);
+ /* put the baseDN and a terminating 0 */
+ pos = addByteArray(byteDn, resultByteArray, pos);
- /* put the window size */
- pos = addByteArray(byteWindowSize, resultByteArray, pos);
+ /* put the ServerId */
+ pos = addByteArray(byteServerId, resultByteArray, pos);
- /* put the SSL Encryption setting */
- pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
+ /* put the ServerURL */
+ pos = addByteArray(byteServerUrl, resultByteArray, pos);
- /* put the degraded status threshold */
- pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
+ /* put the window size */
+ pos = addByteArray(byteWindowSize, resultByteArray, pos);
- /* put the ServerState */
- pos = addByteArray(byteServerState, resultByteArray, pos);
+ /* put the SSL Encryption setting */
+ pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
- {
- return null;
- }
+ /* put the degraded status threshold */
+ pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
+
+ /* put the ServerState */
+ pos = addByteArray(byteServerState, resultByteArray, pos);
+
+ return resultByteArray;
}
/**
@@ -338,32 +349,6 @@
}
/**
- * {@inheritDoc}
- */
- @Override
- public byte[] getBytes(short reqProtocolVersion)
- throws UnsupportedEncodingException
- {
- // Of course, always support current protocol version
- if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
- {
- return getBytes();
- }
-
- // Supported older protocol versions
- switch (reqProtocolVersion)
- {
- case ProtocolVersion.REPLICATION_PROTOCOL_V1:
- return getBytes_V1();
- default:
- // Unsupported requested version
- throw new UnsupportedEncodingException(getClass().getSimpleName() +
- " PDU does not support requested protocol version serialization: " +
- reqProtocolVersion);
- }
- }
-
- /**
* Get the byte array representation of this Message. This uses the version
* 1 of the replication protocol (used for compatibility purpose).
*
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index 3f70856..5ef9135 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -125,14 +125,19 @@
* is done taking into account the various supported replication protocol
* versions.
*
- * @param buffer The encode form of the ReplicationMsg.
+ * @param buffer The encode form of the ReplicationMsg.
+ * @param version The version to use to decode the msg.
+ *
* @return The generated SycnhronizationMessage.
+ *
* @throws DataFormatException If the encoded form was not a valid msg.
* @throws UnsupportedEncodingException If UTF8 is not supported.
* @throws NotSupportedOldVersionPDUException If the PDU is part of an old
* protocol version and we do not support it.
*/
- public static ReplicationMsg generateMsg(byte[] buffer)
+ public static ReplicationMsg generateMsg(
+ byte[] buffer,
+ short version)
throws DataFormatException, UnsupportedEncodingException,
NotSupportedOldVersionPDUException
{
@@ -207,7 +212,7 @@
msg = new MonitorRequestMsg(buffer);
break;
case MSG_TYPE_REPL_SERVER_MONITOR:
- msg = new MonitorMsg(buffer);
+ msg = new MonitorMsg(buffer, version);
break;
case MSG_TYPE_START_SESSION:
msg = new StartSessionMsg(buffer);
diff --git a/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java b/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
index 366ff2d..3081c12 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -291,7 +291,8 @@
byteServerState.length + 1;
/* encode the header in a byte[] large enough to also contain the mods */
- byte resultByteArray[] = encodeHeader(MSG_TYPE_START_ECL, length);
+ byte resultByteArray[] = encodeHeader(
+ MSG_TYPE_START_ECL, length, ProtocolVersion.getCurrentVersion());
int pos = headerLength;
pos = addByteArray(byteServerUrl, resultByteArray, pos);
diff --git a/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java b/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
index 16a325b..e4991cb 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -327,7 +327,8 @@
byteServerState.length + 1;
/* encode the header in a byte[] large enough to also contain the mods */
- byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length);
+ byte resultByteArray[] = encodeHeader(
+ MSG_TYPE_SERVER_START, length, ProtocolVersion.getCurrentVersion());
int pos = headerLength;
pos = addByteArray(byteDn, resultByteArray, pos);
diff --git a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index e20bffa..0631618 100644
--- a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -69,6 +69,7 @@
private boolean closeInitiated = false;
+ private short protocolVersion = ProtocolVersion.getCurrentVersion();
/**
* Creates a new SocketSession based on the provided socket.
@@ -175,7 +176,7 @@
/* We do not want the heartbeat to close the session when */
/* we are processing a message even a time consuming one. */
lastReceiveTime=0;
- return ReplicationMsg.generateMsg(buffer);
+ return ReplicationMsg.generateMsg(buffer, protocolVersion);
}
catch (OutOfMemoryError e)
{
@@ -243,4 +244,12 @@
{
return closeInitiated;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setProtocolVersion(short version)
+ {
+ protocolVersion = version;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/StartMsg.java b/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
index aa352cb..2ec19b7 100644
--- a/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -75,17 +75,21 @@
* Encode the header for the start message.
*
* @param type The type of the message to create.
- * @param additionalLength additional length needed to encode the remaining
+ * @param additionalLength Additional length needed to encode the remaining
* part of the UpdateMessage.
+ * @param protocolVersion The version to use when encoding the header.
* @return a byte array containing the common header and enough space to
* encode the remaining bytes of the UpdateMessage as was specified
* by the additionalLength.
* (byte array length = common header length + additionalLength)
* @throws UnsupportedEncodingException if UTF-8 is not supported.
*/
- public byte[] encodeHeader(byte type, int additionalLength)
+ public byte[] encodeHeader(
+ byte type, int additionalLength,
+ short protocolVersion)
throws UnsupportedEncodingException
{
+
byte[] byteGenerationID =
String.valueOf(generationId).getBytes("UTF-8");
@@ -101,7 +105,7 @@
encodedMsg[0] = type;
/* put the protocol version */
- encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion();
+ encodedMsg[1] = (byte)protocolVersion;
/* put the generationId */
int pos = 2;
@@ -192,11 +196,11 @@
{
/* then read the version */
short readVersion = (short)encodedMsg[1];
- if (readVersion != ProtocolVersion.getCurrentVersion())
+ if (readVersion < ProtocolVersion.REPLICATION_PROTOCOL_V2)
throw new DataFormatException("Not a valid message: type is " +
encodedMsg[0] + " but protocol version byte is " + readVersion +
" instead of " + ProtocolVersion.getCurrentVersion());
- protocolVersion = ProtocolVersion.getCurrentVersion();
+ protocolVersion = readVersion;
/* read the generationId */
int pos = 2;
diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index 4eda8e5..4b2fc59 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -71,6 +71,7 @@
private boolean closeInitiated = false;
+ private short protocolVersion = ProtocolVersion.getCurrentVersion();
/**
* Creates a new TLSSocketSession.
@@ -185,7 +186,7 @@
/* We do not want the heartbeat to close the session when */
/* we are processing a message even a time consuming one. */
lastReceiveTime=0;
- return ReplicationMsg.generateMsg(buffer);
+ return ReplicationMsg.generateMsg(buffer, protocolVersion);
}
catch (OutOfMemoryError e)
{
@@ -254,4 +255,12 @@
{
return closeInitiated;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setProtocolVersion(short version)
+ {
+ protocolVersion = version;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
index ca50458..5152c9a 100644
--- a/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -194,17 +194,8 @@
public byte[] getBytes(short reqProtocolVersion)
throws UnsupportedEncodingException
{
- // Of course, always support current protocol version
- if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
- {
- return getBytes();
- }
- else
- {
- throw new UnsupportedEncodingException(getClass().getSimpleName() +
- " PDU does not support requested protocol version serialization: " +
- reqProtocolVersion);
- }
+ // There was no change since version 2.
+ return getBytes();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 8494127..8a2fe99 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -435,13 +435,13 @@
*/
public void registerIntoDomain()
{
- // Alright, connected with new DS: store handler.
+ // All-right, connected with new DS: store handler.
Map<Short, DataServerHandler> connectedDSs =
replicationServerDomain.getConnectedDSs();
connectedDSs.put(serverId, this);
// Tell peer DSs a new DS just connected to us
- // No need to resend topo msg to this just new DS so not null
+ // No need to re-send TopologyMsg to this just new DS so not null
// argument
replicationServerDomain.buildAndSendTopoInfoToDSs(this);
// Tell peer RSs a new DS just connected to us
@@ -499,13 +499,13 @@
ReplServerStartMsg outReplServerStartMsg = null;
try
{
- outReplServerStartMsg = sendStartToRemote((short)-1);
+ outReplServerStartMsg = sendStartToRemote(protocolVersion);
// log
logStartHandshakeRCVandSND(inServerStartMsg, outReplServerStartMsg);
// The session initiator decides whether to use SSL.
- // Until here session is encrypted then it depends on the negociation
+ // Until here session is encrypted then it depends on the negotiation
if (!sessionInitiatorSSLEncryption)
session.stopEncryption();
@@ -524,7 +524,7 @@
// aborted after handshake phase one from a DS that is searching for
// best suitable RS.
- // don't log a poluting error when connection aborted
+ // don't log a polluting error when connection aborted
// from a DS that wanted only to perform handshake phase 1 in order
// to determine the best suitable RS:
// 1) -> ServerStartMsg
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 1bc0d14..d0565d6 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -299,7 +299,8 @@
lockDomain(true);
// send start to remote
- ReplServerStartMsg outReplServerStartMsg = sendStartToRemote((short)-1);
+ ReplServerStartMsg outReplServerStartMsg =
+ sendStartToRemote(protocolVersion);
// log
logStartHandshakeRCVandSND(inECLStartMsg, outReplServerStartMsg);
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationData.java b/opends/src/server/org/opends/server/replication/server/ReplicationData.java
index d3bf417..4ac650b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationData.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -30,6 +30,7 @@
import com.sleepycat.je.DatabaseEntry;
+import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -67,6 +68,7 @@
public static UpdateMsg generateChange(byte[] data)
throws Exception
{
- return (UpdateMsg) ReplicationMsg.generateMsg(data);
+ return (UpdateMsg) ReplicationMsg.generateMsg(
+ data, ProtocolVersion.getCurrentVersion());
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index e72f3f7..77d068f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -75,6 +75,7 @@
import org.opends.server.replication.protocol.ServerStartECLMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
+import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
@@ -299,18 +300,21 @@
if (msg instanceof ServerStartMsg)
{
+ session.setProtocolVersion(((StartMsg)msg).getVersion());
DataServerHandler handler = new DataServerHandler(session,
queueSize,serverURL,serverId,this,rcvWindow);
handler.startFromRemoteDS((ServerStartMsg)msg);
}
else if (msg instanceof ReplServerStartMsg)
{
+ session.setProtocolVersion(((StartMsg)msg).getVersion());
ReplicationServerHandler handler = new ReplicationServerHandler(
session,queueSize,serverURL,serverId,this,rcvWindow);
handler.startFromRemoteRS((ReplServerStartMsg)msg);
}
else if (msg instanceof ServerStartECLMsg)
{
+ session.setProtocolVersion(((StartMsg)msg).getVersion());
ECLServerHandler handler = new ECLServerHandler(
session,queueSize,serverURL,serverId,this,rcvWindow);
handler.startFromRemoteServer((ServerStartECLMsg)msg);
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index baa4737..3bae821 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1509,8 +1509,21 @@
// in the topology.
if (senderHandler.isDataServer())
{
- MonitorMsg returnMsg =
+ MonitorMsg returnMsg;
+
+ if (senderHandler.getProtocolVersion() >
+ ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ {
+ returnMsg =
new MonitorMsg(msg.getDestination(), msg.getsenderID());
+ }
+ else
+ {
+ returnMsg =
+ new MonitorMsg(msg.getDestination(), msg.getsenderID(),
+ ProtocolVersion.REPLICATION_PROTOCOL_V1);
+ }
+
try
{
returnMsg.setReplServerDbState(getDbServerState());
@@ -1555,13 +1568,20 @@
return;
}
- MonitorRequestMsg replServerMonitorRequestMsg =
- (MonitorRequestMsg) msg;
+ MonitorMsg monitorMsg;
- MonitorMsg monitorMsg =
- new MonitorMsg(
- replServerMonitorRequestMsg.getDestination(),
- replServerMonitorRequestMsg.getsenderID());
+ if (senderHandler.getProtocolVersion() >
+ ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ {
+ monitorMsg =
+ new MonitorMsg(msg.getDestination(), msg.getsenderID());
+ }
+ else
+ {
+ monitorMsg =
+ new MonitorMsg(msg.getDestination(), msg.getsenderID(),
+ ProtocolVersion.REPLICATION_PROTOCOL_V1);
+ }
// Populate for each connected LDAP Server
// from the states stored in the serverHandler.
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 8d5d9f6..019cbf5 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -85,6 +85,7 @@
{
protocolVersion = ProtocolVersion.minWithCurrent(
inReplServerStartMsg.getVersion());
+ session.setProtocolVersion(protocolVersion);
generationId = inReplServerStartMsg.getGenerationId();
serverId = inReplServerStartMsg.getServerId();
serverURL = inReplServerStartMsg.getServerURL();
@@ -163,13 +164,14 @@
try
{
//
- lockDomain(false); // notimeout
+ lockDomain(false); // no timeout
// we are the initiator and decides of the encryption
boolean sessionInitiatorSSLEncryption = this.initSslEncryption;
// Send start
- ReplServerStartMsg outReplServerStartMsg = sendStartToRemote((short)-1);
+ ReplServerStartMsg outReplServerStartMsg =
+ sendStartToRemote(ProtocolVersion.getCurrentVersion());
// Wait answer
ReplicationMsg msg = session.receive();
@@ -260,20 +262,13 @@
// lock with timeout
lockDomain(true);
- short reqVersion = -1;
- if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
- {
- // We support connection from a V1 RS, send PDU with V1 form
- reqVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
- }
-
- // send start to remote
- ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(reqVersion);
+ ReplServerStartMsg outReplServerStartMsg =
+ sendStartToRemote(protocolVersion);
// log
logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
- // until here session is encrypted then it depends on the negociation
+ // until here session is encrypted then it depends on the negotiation
// The session initiator decides whether to use SSL.
if (!sessionInitiatorSSLEncryption)
session.stopEncryption();
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 24dd0a4..384dcae 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -1042,10 +1042,7 @@
replicationServerDomain.
getReplicationServer().getDegradedStatusThreshold());
- if (requestedProtocolVersion>0)
- session.publish(outReplServerStartMsg, requestedProtocolVersion);
- else
- session.publish(outReplServerStartMsg);
+ session.publish(outReplServerStartMsg, requestedProtocolVersion);
return outReplServerStartMsg;
}
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 9bab16c..e904311 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -765,9 +765,10 @@
* The replication server will use the same one (or an older one
* if it is an old replication server).
*/
- if (keepConnection)
- protocolVersion = ProtocolVersion.minWithCurrent(
+ protocolVersion = ProtocolVersion.minWithCurrent(
replServerStartMsg.getVersion());
+ localSession.setProtocolVersion(protocolVersion);
+
if (!isSslEncryption)
{
@@ -926,6 +927,7 @@
if (keepConnection)
protocolVersion = ProtocolVersion.minWithCurrent(
replServerStartMsg.getVersion());
+ localSession.setProtocolVersion(protocolVersion);
if (!isSslEncryption)
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
index 1f2aad5..1bae3dd 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -236,7 +236,8 @@
byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Un-serialize V1 message
- AddMsg newMsg = (AddMsg)ReplicationMsg.generateMsg(v1MsgBytes);
+ AddMsg newMsg = (AddMsg)ReplicationMsg.generateMsg(
+ v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Check original version of message
assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -273,7 +274,8 @@
newMsg.setSafeDataLevel(safeDataLevel);
// Serialize in VLAST msg
- AddMsg vlastMsg = (AddMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
+ AddMsg vlastMsg = (AddMsg)ReplicationMsg.generateMsg(
+ newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Check original version of message
assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -351,7 +353,8 @@
byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Un-serialize V1 message
- DeleteMsg newMsg = (DeleteMsg)ReplicationMsg.generateMsg(v1MsgBytes);
+ DeleteMsg newMsg = (DeleteMsg)ReplicationMsg.generateMsg(
+ v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Check original version of message
assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -371,7 +374,8 @@
newMsg.setSafeDataLevel(safeDataLevel);
// Serialize in VLAST msg
- DeleteMsg vlastMsg = (DeleteMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
+ DeleteMsg vlastMsg = (DeleteMsg)ReplicationMsg.generateMsg(
+ newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Check original version of message
assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -484,7 +488,8 @@
byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Un-serialize V1 message
- ModifyMsg newMsg = (ModifyMsg)ReplicationMsg.generateMsg(v1MsgBytes);
+ ModifyMsg newMsg = (ModifyMsg)ReplicationMsg.generateMsg(
+ v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Check original version of message
assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -518,7 +523,8 @@
newMsg.setSafeDataLevel(safeDataLevel);
// Serialize in VLAST msg
- ModifyMsg vlastMsg = (ModifyMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
+ ModifyMsg vlastMsg = (ModifyMsg)ReplicationMsg.generateMsg(
+ newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Check original version of message
assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -548,7 +554,7 @@
@DataProvider(name = "createModifyDnData")
public Object[][] createModifyDnData() {
-
+
AttributeType type = DirectoryServer.getAttributeType("description");
Attribute attr1 = Attributes.create("description", "new value");
@@ -579,7 +585,7 @@
Modification mod = new Modification(ModificationType.ADD, attr);
mods4.add(mod);
}
-
+
return new Object[][] {
{"dc=test,dc=com", "dc=new", "11111111-1111-1111-1111-111111111111", "22222222-2222-2222-2222-222222222222", false, "dc=change", mods1, false, AssuredMode.SAFE_DATA_MODE, (byte)0},
{"dc=test,dc=com", "dc=new", "33333333-3333-3333-3333-333333333333", "44444444-4444-4444-4444-444444444444", true, "dc=change", mods2, true, AssuredMode.SAFE_READ_MODE, (byte)1},
@@ -632,7 +638,8 @@
byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Un-serialize V1 message
- ModifyDNMsg newMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(v1MsgBytes);
+ ModifyDNMsg newMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(
+ v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Check original version of message
assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -672,7 +679,8 @@
newMsg.setMods(mods);
// Serialize in VLAST msg
- ModifyDNMsg vlastMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
+ ModifyDNMsg vlastMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(
+ newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
// Check original version of message
assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index de2be1e..ea14331 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -177,8 +177,8 @@
msg.setAssuredMode(assuredMode);
msg.setSafeDataLevel(safeDataLevel);
- ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg
- .generateMsg(msg.getBytes());
+ ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
+ msg.getBytes(), ProtocolVersion.getCurrentVersion());
// Test that generated attributes match original attributes.
assertEquals(generatedMsg.isAssured(), isAssured);
@@ -234,8 +234,8 @@
assertTrue(msg.getSafeDataLevel() == safeDataLevel);
// Check equals
- ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg
- .generateMsg(msg.getBytes());
+ ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
+ msg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
assertFalse(msg.equals(null));
assertFalse(msg.equals(new Object()));
@@ -298,8 +298,8 @@
(short) 123, (short) 45);
op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid"));
DeleteMsg msg = new DeleteMsg(op);
- DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg
- .generateMsg(msg.getBytes());
+ DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg.generateMsg(
+ msg.getBytes(), ProtocolVersion.getCurrentVersion());
assertEquals(msg.toString(), generatedMsg.toString());
@@ -393,7 +393,7 @@
msg.setSafeDataLevel(safeDataLevel);
ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg
- .generateMsg(msg.getBytes());
+ .generateMsg(msg.getBytes(), ProtocolVersion.getCurrentVersion());
// Test that generated attributes match original attributes.
assertEquals(generatedMsg.isAssured(), isAssured);
@@ -466,7 +466,7 @@
msg.setSafeDataLevel(safeDataLevel);
AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(msg
- .getBytes());
+ .getBytes(), ProtocolVersion.getCurrentVersion());
assertEquals(msg.getBytes(), generatedMsg.getBytes());
assertEquals(msg.toString(), generatedMsg.toString());
@@ -611,7 +611,8 @@
}
// Check that retrieved CN is OK
- msg2 = (AckMsg) ReplicationMsg.generateMsg(msg1.getBytes());
+ msg2 = (AckMsg) ReplicationMsg.generateMsg(
+ msg1.getBytes(), ProtocolVersion.getCurrentVersion());
}
@Test()
@@ -1006,7 +1007,7 @@
msg.setServerState(sid3, s3, now+3, false);
byte[] b = msg.getBytes();
- MonitorMsg newMsg = new MonitorMsg(b);
+ MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion());
assertEquals(rsState, msg.getReplServerDbState());
assertEquals(newMsg.getReplServerDbState().toString(),
--
Gitblit v1.10.0