From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.

---
 opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java |   86 +++++++++++++++++++++++++++++++++++++------
 1 files changed, 74 insertions(+), 12 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java b/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
index a477f0c..034246a 100644
--- a/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Copyright 2006-2010 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -47,31 +47,37 @@
   // is related to its own request.
   private int requestorID;
 
+  private int initWindow;
+
   /**
-   * Creates a InitializeDestinationMessage.
+   * Creates a InitializeTargetMsg.
    *
-   * @param baseDN The base DN for which the InitializeMessage is created.
-   * @param serverID The serverID of the server that sends this message.
-   * @param target The destination of this message.
-   * @param target2 The server that initiates this export.
+   * @param baseDN     The base DN for which the InitializeMessage is created.
+   * @param serverID   The serverID of the server that sends this message.
+   * @param target     The destination of this message.
+   * @param target2    The server that initiates this export.
    * @param entryCount The count of entries that will be sent.
+   * @param initWindow the initialization window.
    */
   public InitializeTargetMsg(String baseDN, int serverID,
-      int target, int target2, long entryCount)
+      int target, int target2, long entryCount, int initWindow)
   {
     super(serverID, target);
     this.requestorID = target2;
     this.baseDN = baseDN;
     this.entryCount = entryCount;
+    this.initWindow = initWindow; // V4
   }
 
   /**
    * Creates an InitializeTargetMsg by decoding the provided byte array.
    * @param in A byte array containing the encoded information for the Message
+   * @param version The protocol version to use to decode the msg
    * @throws DataFormatException If the in does not contain a properly
    *                             encoded InitializeMessage.
    */
-  public InitializeTargetMsg(byte[] in) throws DataFormatException
+  public InitializeTargetMsg(byte[] in, short version)
+  throws DataFormatException
   {
     super();
     try
@@ -111,6 +117,14 @@
       entryCount = Long.valueOf(entryCountString);
       pos += length +1;
 
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // init window
+        length = getNextLength(in, pos);
+        String initWindowString = new String(in, pos, length, "UTF-8");
+        initWindow = Integer.valueOf(initWindowString);
+        pos += length +1;
+      }
     }
     catch (UnsupportedEncodingException e)
     {
@@ -129,9 +143,12 @@
 
   /**
    * Get the serverID of the server that initiated the export.
+   * Roughly it is the server running the task,
+   * - the importer for the Initialize task,
+   * - the exporter for the InitializeRemote task.
    * @return the serverID
    */
-  public long getRequestorID()
+  public long getInitiatorID()
   {
     return this.requestorID;
   }
@@ -143,14 +160,38 @@
    */
   public String getBaseDN()
   {
-    return baseDN;
+    return this.baseDN;
+  }
+
+  /**
+   * Get the initializationWindow.
+   *
+   * @return the initialization window.
+   */
+  public int getInitWindow()
+  {
+    return this.initWindow;
+  }
+
+  // ============
+  // Msg encoding
+  // ============
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes()
+  throws UnsupportedEncodingException
+  {
+    return getBytes(ProtocolVersion.getCurrentVersion());
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public byte[] getBytes()
+  public byte[] getBytes(short version)
+  throws UnsupportedEncodingException
   {
     try
     {
@@ -159,13 +200,19 @@
       byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
       byte[] byteRequestor = String.valueOf(requestorID).getBytes("UTF-8");
       byte[] byteEntryCount = String.valueOf(entryCount).getBytes("UTF-8");
-
+      byte[] byteInitWindow = null;
       int length = 1 + byteDestination.length + 1
                      + byteDn.length + 1
                      + byteSender.length + 1
                      + byteRequestor.length + 1
                      + byteEntryCount.length + 1;
 
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        byteInitWindow = String.valueOf(initWindow).getBytes("UTF-8");
+        length += byteInitWindow.length + 1;
+      }
+
       byte[] resultByteArray = new byte[length];
 
       /* put the type of the operation */
@@ -187,6 +234,12 @@
       /* put the entryCount */
       pos = addByteArray(byteEntryCount, resultByteArray, pos);
 
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        /* put the initWindow */
+        pos = addByteArray(byteInitWindow, resultByteArray, pos);
+      }
+
       return resultByteArray;
     }
     catch (UnsupportedEncodingException e)
@@ -194,4 +247,13 @@
       return null;
     }
   }
+
+  /**
+   * Set the initWindow value.
+   * @param initWindow The initialization window.
+   */
+  public void setInitWindow(int initWindow)
+  {
+    this.initWindow = initWindow;
+  }
 }

--
Gitblit v1.10.0