From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.
---
opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java | 90 +++++++++++++++++++++++++++++++++++++++------
1 files changed, 78 insertions(+), 12 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java b/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
index 8d1c0c1..c1321a6 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
@@ -22,13 +22,14 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
import org.opends.messages.Message;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
@@ -51,6 +52,10 @@
// Specifies the complementary details about the error that was detected
private Message details = null;
+ // The time of creation of this message.
+ // protocol version previous to V4
+ private Long creationTime = System.currentTimeMillis();
+
/**
* Creates an ErrorMsg providing the destination server.
*
@@ -64,9 +69,11 @@
super(sender, destination);
this.msgID = details.getDescriptor().getId();
this.details = details;
+ this.creationTime = System.currentTimeMillis();
if (debugEnabled())
- TRACER.debugInfo(" Creating error message" + this.toString());
+ TRACER.debugInfo(" Creating error message" + this.toString()
+ + " " + stackTraceToSingleLineString(new Exception("trace")));
}
/**
@@ -80,6 +87,7 @@
super(-2, i);
this.msgID = details.getDescriptor().getId();
this.details = details;
+ this.creationTime = System.currentTimeMillis();
if (debugEnabled())
TRACER.debugInfo(this.toString());
@@ -89,17 +97,20 @@
* Creates a new ErrorMsg by decoding the provided byte array.
*
* @param in A byte array containing the encoded information for the Message
+ * @param version The protocol version to use to decode the msg.
* @throws DataFormatException If the in does not contain a properly
* encoded message.
*/
- public ErrorMsg(byte[] in) throws DataFormatException
+ public ErrorMsg(byte[] in, short version)
+ throws DataFormatException
{
super();
try
{
/* first byte is the type */
if (in[0] != MSG_TYPE_ERROR)
- throw new DataFormatException("input is not a valid InitializeMessage");
+ throw new DataFormatException("input is not a valid " +
+ this.getClass().getCanonicalName());
int pos = 1;
// sender
@@ -125,6 +136,14 @@
details = Message.raw(new String(in, pos, length, "UTF-8"));
pos += length +1;
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // Creation Time
+ length = getNextLength(in, pos);
+ String creationTimeString = new String(in, pos, length, "UTF-8");
+ creationTime = Long.valueOf(creationTimeString);
+ pos += length +1;
+ }
}
catch (UnsupportedEncodingException e)
{
@@ -133,9 +152,9 @@
}
/**
- * Get the base DN from this InitializeMessage.
+ * Get the details from this message.
*
- * @return the base DN from this InitializeMessage.
+ * @return the details from this message.
*/
public Message getDetails()
{
@@ -143,35 +162,52 @@
}
/**
- * Get the base DN from this InitializeMessage.
+ * Get the msgID from this message.
*
- * @return the base DN from this InitializeMessage.
+ * @return the msgID from this message.
*/
public int getMsgID()
{
return msgID;
}
+ // ============
+ // Msg encoding
+ // ============
/**
* {@inheritDoc}
*/
@Override
public byte[] getBytes()
+ throws UnsupportedEncodingException
{
- /* The InitializeMessage is stored in the form :
- * <operation type><basedn><serverid>
- */
+ return getBytes(ProtocolVersion.getCurrentVersion());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes(short version)
+ {
try {
byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
byte[] byteErrMsgId = String.valueOf(msgID).getBytes("UTF-8");
byte[] byteDetails = details.toString().getBytes("UTF-8");
+ byte[] byteCreationTime = null;
int length = 1 + byteSender.length + 1
+ byteDestination.length + 1
+ byteErrMsgId.length + 1
+ byteDetails.length + 1;
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ byteCreationTime = creationTime.toString().getBytes("UTF-8");
+ length += byteCreationTime.length + 1;
+ }
+
byte[] resultByteArray = new byte[length];
// put the type of the operation
@@ -190,6 +226,12 @@
// details
pos = addByteArray(byteDetails, resultByteArray, pos);
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // creation time
+ pos = addByteArray(byteCreationTime, resultByteArray, pos);
+ }
+
return resultByteArray;
}
catch (UnsupportedEncodingException e)
@@ -209,6 +251,30 @@
" sender=" + this.senderID +
" destination=" + this.destination +
" msgID=" + this.msgID +
- " details=" + this.details + "]";
+ " details=" + this.details +
+ " creationTime=" + this.creationTime + "]";
}
+
+ /**
+ * Get the creation time of this message.
+ * When several attempts of initialization are done sequentially, it helps
+ * sorting the good ones, from the ones that relate to ended initialization
+ * when they are received.
+ *
+ * @return the creation time of this message.
+ */
+ public Long getCreationTime()
+ {
+ return creationTime;
+ }
+
+ /**
+ * Get the creation time of this message.
+ * @param creationTime the creation time of this message.
+ */
+ public void setCreationTime(long creationTime)
+ {
+ this.creationTime = creationTime;
+ }
+
}
--
Gitblit v1.10.0