From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.

---
 opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java |   74 ++++++++++++++++++++++++++++++++----
 1 files changed, 65 insertions(+), 9 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java b/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
index 5e8faef..28aab82 100644
--- a/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Copyright 2006-2010 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -41,27 +41,33 @@
 public class InitializeRequestMsg extends RoutableMsg
 {
   private String baseDn = null;
+  private int initWindow = 0;
 
   /**
    * Creates a InitializeRequestMsg message.
    *
-   * @param baseDn The base DN of the replication domain.
+   * @param baseDn      the base DN of the replication domain.
    * @param destination destination of this message
-   * @param serverID serverID of the server that will send this message
+   * @param serverID    serverID of the server that will send this message
+   * @param initWindow  initialization window for flow control
    */
-  public InitializeRequestMsg(String baseDn, int serverID, int destination)
+  public InitializeRequestMsg(String baseDn, int serverID, int destination,
+      int initWindow)
   {
     super(serverID, destination);
     this.baseDn = baseDn;
+    this.initWindow = initWindow; // V4
   }
 
   /**
    * Creates a new InitializeRequestMsg by decoding the provided byte array.
    * @param in A byte array containing the encoded information for the Message
+   * @param version The protocol version to use to decode the msg
    * @throws DataFormatException If the in does not contain a properly
    *                             encoded InitializeMessage.
    */
-  public InitializeRequestMsg(byte[] in) throws DataFormatException
+  public InitializeRequestMsg(byte[] in, short version)
+  throws DataFormatException
   {
     super();
     try
@@ -89,7 +95,14 @@
       destination = Integer.valueOf(destinationServerIdString);
       pos += length +1;
 
-
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // init window
+        length = getNextLength(in, pos);
+        String initWindowString = new String(in, pos, length, "UTF-8");
+        initWindow = Integer.valueOf(initWindowString);
+        pos += length +1;
+      }
     } catch (UnsupportedEncodingException e)
     {
       throw new DataFormatException("UTF-8 is not supported by this jvm.");
@@ -114,21 +127,40 @@
     }
   }
 
+  // ============
+  // Msg encoding
+  // ============
   /**
    * {@inheritDoc}
    */
   @Override
   public byte[] getBytes()
+  throws UnsupportedEncodingException
+  {
+    return getBytes(ProtocolVersion.getCurrentVersion());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes(short version)
   {
     try {
       byte[] baseDNBytes = baseDn.getBytes("UTF-8");
       byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
-      byte[] destinationBytes = String.valueOf(destination).
-      getBytes("UTF-8");
+      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+      byte[] initWindowBytes = null;
 
       int length = 1 + baseDNBytes.length + 1 + senderBytes.length + 1
         + destinationBytes.length + 1;
 
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        initWindowBytes = String.valueOf(initWindow).getBytes("UTF-8");
+        length += initWindowBytes.length + 1;
+      }
+
       byte[] resultByteArray = new byte[length];
 
       // type of the operation
@@ -144,6 +176,12 @@
       // destination
       pos = addByteArray(destinationBytes, resultByteArray, pos);
 
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // init window
+        pos = addByteArray(initWindowBytes, resultByteArray, pos);
+      }
+
       return resultByteArray;
     }
     catch (UnsupportedEncodingException e)
@@ -159,6 +197,24 @@
   public String toString()
   {
     return "InitializeRequestMessage: baseDn="+baseDn+" senderId="+senderID +
-    " destination=" + destination;
+    " destination=" + destination + " initWindow=" + initWindow;
+  }
+
+  /**
+   * Return the initWindow value.
+   * @return the initWindow.
+   */
+  public int getInitWindow()
+  {
+    return this.initWindow;
+  }
+
+  /**
+   * Set the initWindow value.
+   * @param initWindow The initialization window.
+   */
+  public void setInitWindow(int initWindow)
+  {
+    this.initWindow = initWindow;
   }
 }

--
Gitblit v1.10.0