From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.
---
opends/src/server/org/opends/server/replication/protocol/EntryMsg.java | 75 +++++++++++++++++++++++++++++++++----
1 files changed, 66 insertions(+), 9 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java b/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
index 270a36c..e6bbbb6 100644
--- a/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -39,6 +39,7 @@
{
// The byte array containing the bytes of the entry transported
private byte[] entryByteArray;
+ private int msgId = -1; // from V4
/**
* Creates a new EntryMsg.
@@ -46,52 +47,60 @@
* @param sender The sender of this message.
* @param destination The destination of this message.
* @param entryBytes The bytes of the entry.
+ * @param msgId Message counter.
*/
public EntryMsg(
int sender,
int destination,
- byte[] entryBytes)
+ byte[] entryBytes,
+ int msgId)
{
super(sender, destination);
this.entryByteArray = new byte[entryBytes.length];
System.arraycopy(entryBytes, 0, this.entryByteArray, 0, entryBytes.length);
+ this.msgId = msgId;
}
/**
* Creates a new EntryMsg.
*
- * @param serverID The sender of this message.
- * @param i The destination of this message.
- * @param entryBytes The bytes of the entry.
+ * @param serverID The sender of this message.
+ * @param i The destination of this message.
+ * @param entryBytes The bytes of the entry.
* @param pos The starting Position in the array.
* @param length Number of array elements to be copied.
+ * @param msgId Message counter.
*/
public EntryMsg(
int serverID,
int i,
byte[] entryBytes,
int pos,
- int length)
+ int length,
+ int msgId)
{
super(serverID, i);
this.entryByteArray = new byte[length];
System.arraycopy(entryBytes, pos, this.entryByteArray, 0, length);
+ this.msgId = msgId;
}
/**
* Creates a new EntryMsg from its encoded form.
*
* @param in The byte array containing the encoded form of the message.
+ * @param version The protocol version to use to decode the msg
* @throws DataFormatException If the byte array does not contain a valid
* encoded form of the ServerStartMessage.
*/
- public EntryMsg(byte[] in) throws DataFormatException
+ public EntryMsg(byte[] in, short version) throws DataFormatException
{
try
{
/* first byte is the type */
if (in[0] != MSG_TYPE_ENTRY)
- throw new DataFormatException("input is not a valid ServerStart msg");
+ throw new DataFormatException("input is not a valid " +
+ this.getClass().getCanonicalName());
int pos = 1;
// sender
@@ -107,12 +116,22 @@
pos += length +1;
// entry
- length = in.length - (pos + 1);
+ length = getNextLength(in, pos);
this.entryByteArray = new byte[length];
for (int i=0; i<length; i++)
{
entryByteArray[i] = in[pos+i];
}
+ pos += length +1;
+
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // msgCnt
+ length = getNextLength(in, pos);
+ String msgcntString = new String(in, pos, length, "UTF-8");
+ this.msgId = Integer.valueOf(msgcntString);
+ pos += length +1;
+ }
}
catch (UnsupportedEncodingException e)
{
@@ -134,16 +153,33 @@
*/
@Override
public byte[] getBytes()
+ throws UnsupportedEncodingException
+ {
+ return getBytes(ProtocolVersion.getCurrentVersion());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes(short version)
{
try {
byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+ byte[] msgCntBytes = null;
byte[] entryBytes = entryByteArray;
int length = 1 + senderBytes.length +
1 + destinationBytes.length +
1 + entryBytes.length + 1;
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ msgCntBytes = String.valueOf(msgId).getBytes("UTF-8");
+ length += (1 + msgCntBytes.length);
+ }
+
byte[] resultByteArray = new byte[length];
/* put the type of the operation */
@@ -153,6 +189,9 @@
pos = addByteArray(senderBytes, resultByteArray, pos);
pos = addByteArray(destinationBytes, resultByteArray, pos);
pos = addByteArray(entryBytes, resultByteArray, pos);
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ pos = addByteArray(msgCntBytes, resultByteArray, pos);
+
return resultByteArray;
}
catch (UnsupportedEncodingException e)
@@ -160,4 +199,22 @@
return null;
}
}
+
+ /**
+ * Return the msg id.
+ * @return The msg id.
+ */
+ public int getMsgId()
+ {
+ return this.msgId;
+ }
+
+ /**
+ * Set the msg id.
+ * @param msgId The msg id.
+ */
+ public void setMsgId(int msgId)
+ {
+ this.msgId = msgId;
+ }
}
--
Gitblit v1.10.0