From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.

---
 opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java |   90 +++++++++++++++++++++++++++++++++++++++------
 1 files changed, 78 insertions(+), 12 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java b/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
index 8d1c0c1..c1321a6 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
@@ -22,13 +22,14 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Copyright 2006-2010 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 import org.opends.messages.Message;
 
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.UnsupportedEncodingException;
 import java.util.zip.DataFormatException;
@@ -51,6 +52,10 @@
   // Specifies the complementary details about the error that was detected
   private Message details = null;
 
+  // The time of creation of this message.
+  //                                        protocol version previous to V4
+  private Long creationTime = System.currentTimeMillis();
+
   /**
    * Creates an ErrorMsg providing the destination server.
    *
@@ -64,9 +69,11 @@
     super(sender, destination);
     this.msgID  = details.getDescriptor().getId();
     this.details = details;
+    this.creationTime = System.currentTimeMillis();
 
     if (debugEnabled())
-      TRACER.debugInfo(" Creating error message" + this.toString());
+      TRACER.debugInfo(" Creating error message" + this.toString()
+          + " " + stackTraceToSingleLineString(new Exception("trace")));
   }
 
   /**
@@ -80,6 +87,7 @@
     super(-2, i);
     this.msgID  = details.getDescriptor().getId();
     this.details = details;
+    this.creationTime = System.currentTimeMillis();
 
     if (debugEnabled())
       TRACER.debugInfo(this.toString());
@@ -89,17 +97,20 @@
    * Creates a new ErrorMsg by decoding the provided byte array.
    *
    * @param  in A byte array containing the encoded information for the Message
+   * @param version The protocol version to use to decode the msg.
    * @throws DataFormatException If the in does not contain a properly
    *                             encoded message.
    */
-  public ErrorMsg(byte[] in) throws DataFormatException
+  public ErrorMsg(byte[] in, short version)
+  throws DataFormatException
   {
     super();
     try
     {
       /* first byte is the type */
       if (in[0] != MSG_TYPE_ERROR)
-        throw new DataFormatException("input is not a valid InitializeMessage");
+        throw new DataFormatException("input is not a valid " +
+            this.getClass().getCanonicalName());
       int pos = 1;
 
       // sender
@@ -125,6 +136,14 @@
       details = Message.raw(new String(in, pos, length, "UTF-8"));
       pos += length +1;
 
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // Creation Time
+        length = getNextLength(in, pos);
+        String creationTimeString = new String(in, pos, length, "UTF-8");
+        creationTime = Long.valueOf(creationTimeString);
+        pos += length +1;
+      }
     }
     catch (UnsupportedEncodingException e)
     {
@@ -133,9 +152,9 @@
   }
 
   /**
-   * Get the base DN from this InitializeMessage.
+   * Get the details from this message.
    *
-   * @return the base DN from this InitializeMessage.
+   * @return the details from this message.
    */
   public Message getDetails()
   {
@@ -143,35 +162,52 @@
   }
 
   /**
-   * Get the base DN from this InitializeMessage.
+   * Get the msgID from this message.
    *
-   * @return the base DN from this InitializeMessage.
+   * @return the msgID from this message.
    */
   public int getMsgID()
   {
     return msgID;
   }
 
+  // ============
+  // Msg encoding
+  // ============
   /**
    * {@inheritDoc}
    */
   @Override
   public byte[] getBytes()
+  throws UnsupportedEncodingException
   {
-    /* The InitializeMessage is stored in the form :
-     * <operation type><basedn><serverid>
-     */
+    return getBytes(ProtocolVersion.getCurrentVersion());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes(short version)
+  {
     try {
       byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
       byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
       byte[] byteErrMsgId = String.valueOf(msgID).getBytes("UTF-8");
       byte[] byteDetails = details.toString().getBytes("UTF-8");
+      byte[] byteCreationTime = null;
 
       int length = 1 + byteSender.length + 1
                      + byteDestination.length + 1
                      + byteErrMsgId.length + 1
                      + byteDetails.length + 1;
 
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        byteCreationTime = creationTime.toString().getBytes("UTF-8");
+        length += byteCreationTime.length + 1;
+      }
+
       byte[] resultByteArray = new byte[length];
 
       // put the type of the operation
@@ -190,6 +226,12 @@
       // details
       pos = addByteArray(byteDetails, resultByteArray, pos);
 
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // creation time
+        pos = addByteArray(byteCreationTime, resultByteArray, pos);
+      }
+
       return resultByteArray;
     }
     catch (UnsupportedEncodingException e)
@@ -209,6 +251,30 @@
       " sender=" + this.senderID +
       " destination=" + this.destination +
       " msgID=" + this.msgID +
-      " details=" + this.details + "]";
+      " details=" + this.details +
+      " creationTime=" + this.creationTime + "]";
   }
+
+  /**
+   * Get the creation time of this message.
+   * When several attempts of initialization are done sequentially, it helps
+   * sorting the good ones, from the ones that relate to ended initialization
+   * when they are received.
+   *
+   * @return the creation time of this message.
+   */
+  public Long getCreationTime()
+  {
+    return creationTime;
+  }
+
+  /**
+   * Get the creation time of this message.
+   * @param creationTime the creation time of this message.
+   */
+  public void setCreationTime(long creationTime)
+  {
+    this.creationTime = creationTime;
+  }
+
 }

--
Gitblit v1.10.0