From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.

---
 opends/src/server/org/opends/server/replication/protocol/EntryMsg.java |   75 +++++++++++++++++++++++++++++++++----
 1 files changed, 66 insertions(+), 9 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java b/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
index 270a36c..e6bbbb6 100644
--- a/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Copyright 2006-2010 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -39,6 +39,7 @@
 {
   // The byte array containing the bytes of the entry transported
   private byte[] entryByteArray;
+  private int msgId = -1; // from V4
 
   /**
    * Creates a new EntryMsg.
@@ -46,52 +47,60 @@
    * @param sender      The sender of this message.
    * @param destination The destination of this message.
    * @param entryBytes  The bytes of the entry.
+   * @param msgId       Message counter.
    */
   public EntryMsg(
       int sender,
       int destination,
-      byte[] entryBytes)
+      byte[] entryBytes,
+      int msgId)
   {
     super(sender, destination);
     this.entryByteArray = new byte[entryBytes.length];
     System.arraycopy(entryBytes, 0, this.entryByteArray, 0, entryBytes.length);
+    this.msgId = msgId;
   }
 
   /**
    * Creates a new EntryMsg.
    *
-   * @param serverID The sender of this message.
-   * @param i The destination of this message.
-   * @param entryBytes The bytes of the entry.
+   * @param serverID    The sender of this message.
+   * @param i           The destination of this message.
+   * @param entryBytes  The bytes of the entry.
    * @param pos         The starting Position in the array.
    * @param length      Number of array elements to be copied.
+   * @param msgId       Message counter.
    */
   public EntryMsg(
       int serverID,
       int i,
       byte[] entryBytes,
       int pos,
-      int length)
+      int length,
+      int msgId)
   {
     super(serverID, i);
     this.entryByteArray = new byte[length];
     System.arraycopy(entryBytes, pos, this.entryByteArray, 0, length);
+    this.msgId = msgId;
   }
 
   /**
    * Creates a new EntryMsg from its encoded form.
    *
    * @param in The byte array containing the encoded form of the message.
+   * @param version The protocol version to use to decode the msg
    * @throws DataFormatException If the byte array does not contain a valid
    *                             encoded form of the ServerStartMessage.
    */
-  public EntryMsg(byte[] in) throws DataFormatException
+  public EntryMsg(byte[] in, short version) throws DataFormatException
   {
     try
     {
       /* first byte is the type */
       if (in[0] != MSG_TYPE_ENTRY)
-        throw new DataFormatException("input is not a valid ServerStart msg");
+        throw new DataFormatException("input is not a valid " +
+            this.getClass().getCanonicalName());
       int pos = 1;
 
       // sender
@@ -107,12 +116,22 @@
       pos += length +1;
 
       // entry
-      length = in.length - (pos + 1);
+      length = getNextLength(in, pos);
       this.entryByteArray = new byte[length];
       for (int i=0; i<length; i++)
       {
         entryByteArray[i] = in[pos+i];
       }
+      pos += length +1;
+
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // msgCnt
+        length = getNextLength(in, pos);
+        String msgcntString = new String(in, pos, length, "UTF-8");
+        this.msgId = Integer.valueOf(msgcntString);
+        pos += length +1;
+      }
     }
     catch (UnsupportedEncodingException e)
     {
@@ -134,16 +153,33 @@
    */
   @Override
   public byte[] getBytes()
+  throws UnsupportedEncodingException
+  {
+    return getBytes(ProtocolVersion.getCurrentVersion());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes(short version)
   {
     try {
       byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
       byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+      byte[] msgCntBytes = null;
       byte[] entryBytes = entryByteArray;
 
       int length = 1 + senderBytes.length +
                    1 + destinationBytes.length +
                    1 + entryBytes.length + 1;
 
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        msgCntBytes = String.valueOf(msgId).getBytes("UTF-8");
+        length += (1 + msgCntBytes.length);
+      }
+
       byte[] resultByteArray = new byte[length];
 
       /* put the type of the operation */
@@ -153,6 +189,9 @@
       pos = addByteArray(senderBytes, resultByteArray, pos);
       pos = addByteArray(destinationBytes, resultByteArray, pos);
       pos = addByteArray(entryBytes, resultByteArray, pos);
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+        pos = addByteArray(msgCntBytes, resultByteArray, pos);
+
       return resultByteArray;
     }
     catch (UnsupportedEncodingException e)
@@ -160,4 +199,22 @@
       return null;
     }
   }
+
+  /**
+   * Return the msg id.
+   * @return The msg id.
+   */
+  public int getMsgId()
+  {
+    return this.msgId;
+  }
+
+  /**
+   * Set the msg id.
+   * @param msgId The msg id.
+   */
+  public void setMsgId(int msgId)
+  {
+    this.msgId = msgId;
+  }
 }

--
Gitblit v1.10.0