From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.
---
opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java | 86 +++++++++++++++++++++++++++++++++++++------
1 files changed, 74 insertions(+), 12 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java b/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
index a477f0c..034246a 100644
--- a/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -47,31 +47,37 @@
// is related to its own request.
private int requestorID;
+ private int initWindow;
+
/**
- * Creates a InitializeDestinationMessage.
+ * Creates a InitializeTargetMsg.
*
- * @param baseDN The base DN for which the InitializeMessage is created.
- * @param serverID The serverID of the server that sends this message.
- * @param target The destination of this message.
- * @param target2 The server that initiates this export.
+ * @param baseDN The base DN for which the InitializeMessage is created.
+ * @param serverID The serverID of the server that sends this message.
+ * @param target The destination of this message.
+ * @param target2 The server that initiates this export.
* @param entryCount The count of entries that will be sent.
+ * @param initWindow the initialization window.
*/
public InitializeTargetMsg(String baseDN, int serverID,
- int target, int target2, long entryCount)
+ int target, int target2, long entryCount, int initWindow)
{
super(serverID, target);
this.requestorID = target2;
this.baseDN = baseDN;
this.entryCount = entryCount;
+ this.initWindow = initWindow; // V4
}
/**
* Creates an InitializeTargetMsg by decoding the provided byte array.
* @param in A byte array containing the encoded information for the Message
+ * @param version The protocol version to use to decode the msg
* @throws DataFormatException If the in does not contain a properly
* encoded InitializeMessage.
*/
- public InitializeTargetMsg(byte[] in) throws DataFormatException
+ public InitializeTargetMsg(byte[] in, short version)
+ throws DataFormatException
{
super();
try
@@ -111,6 +117,14 @@
entryCount = Long.valueOf(entryCountString);
pos += length +1;
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // init window
+ length = getNextLength(in, pos);
+ String initWindowString = new String(in, pos, length, "UTF-8");
+ initWindow = Integer.valueOf(initWindowString);
+ pos += length +1;
+ }
}
catch (UnsupportedEncodingException e)
{
@@ -129,9 +143,12 @@
/**
* Get the serverID of the server that initiated the export.
+ * Roughly it is the server running the task,
+ * - the importer for the Initialize task,
+ * - the exporter for the InitializeRemote task.
* @return the serverID
*/
- public long getRequestorID()
+ public long getInitiatorID()
{
return this.requestorID;
}
@@ -143,14 +160,38 @@
*/
public String getBaseDN()
{
- return baseDN;
+ return this.baseDN;
+ }
+
+ /**
+ * Get the initializationWindow.
+ *
+ * @return the initialization window.
+ */
+ public int getInitWindow()
+ {
+ return this.initWindow;
+ }
+
+ // ============
+ // Msg encoding
+ // ============
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ throws UnsupportedEncodingException
+ {
+ return getBytes(ProtocolVersion.getCurrentVersion());
}
/**
* {@inheritDoc}
*/
@Override
- public byte[] getBytes()
+ public byte[] getBytes(short version)
+ throws UnsupportedEncodingException
{
try
{
@@ -159,13 +200,19 @@
byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
byte[] byteRequestor = String.valueOf(requestorID).getBytes("UTF-8");
byte[] byteEntryCount = String.valueOf(entryCount).getBytes("UTF-8");
-
+ byte[] byteInitWindow = null;
int length = 1 + byteDestination.length + 1
+ byteDn.length + 1
+ byteSender.length + 1
+ byteRequestor.length + 1
+ byteEntryCount.length + 1;
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ byteInitWindow = String.valueOf(initWindow).getBytes("UTF-8");
+ length += byteInitWindow.length + 1;
+ }
+
byte[] resultByteArray = new byte[length];
/* put the type of the operation */
@@ -187,6 +234,12 @@
/* put the entryCount */
pos = addByteArray(byteEntryCount, resultByteArray, pos);
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ /* put the initWindow */
+ pos = addByteArray(byteInitWindow, resultByteArray, pos);
+ }
+
return resultByteArray;
}
catch (UnsupportedEncodingException e)
@@ -194,4 +247,13 @@
return null;
}
}
+
+ /**
+ * Set the initWindow value.
+ * @param initWindow The initialization window.
+ */
+ public void setInitWindow(int initWindow)
+ {
+ this.initWindow = initWindow;
+ }
}
--
Gitblit v1.10.0