From 41bef7c0b619c7bc925326451a56071b5736580a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 19 Jun 2013 08:36:16 +0000
Subject: [PATCH] Fix OPENDJ-986: Exception when reading messages from Replication server RS

---
 opends/src/server/org/opends/server/replication/server/ServerHandler.java |   95 +++++++++++------------------------------------
 1 files changed, 23 insertions(+), 72 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index e6915fd..7be892c 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -49,12 +49,7 @@
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.AckMsg;
 import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
-import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.EntryMsg;
-import org.opends.server.replication.protocol.InitializeRequestMsg;
-import org.opends.server.replication.protocol.InitializeTargetMsg;
 import org.opends.server.replication.protocol.HeartbeatThread;
-import org.opends.server.replication.protocol.MonitorMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplicationMsg;
@@ -193,11 +188,6 @@
    * The initial size of the sending window.
    */
   int sendWindowSize;
-
-  /**
-   * The protocol version established with the remote server.
-   */
-  protected short protocolVersion = -1;
   /**
    * remote generation id.
    */
@@ -267,7 +257,6 @@
     super(queueSize, replicationServerURL,
         replicationServerId, replicationServer);
     this.session = session;
-    this.protocolVersion = ProtocolVersion.getCurrentVersion();
     this.rcvWindowSizeHalf = rcvWindowSize / 2;
     this.maxRcvWindow = rcvWindowSize;
     this.rcvWindow = rcvWindowSize;
@@ -399,13 +388,24 @@
 
   /**
    * Sends a message.
-   * @param  msg         The message to be sent.
-   * @throws IOException When it occurs while sending the message,
    *
+   * @param msg
+   *          The message to be sent.
+   * @throws IOException
+   *           When it occurs while sending the message,
    */
-  public void send(ReplicationMsg msg)
-  throws IOException
+  public void send(ReplicationMsg msg) throws IOException
   {
+    /*
+     * Some unit tests include a null domain, so avoid logging anything in that
+     * case.
+     */
+    if (debugEnabled() && replicationServerDomain != null)
+    {
+      TRACER.debugInfo("In "
+          + replicationServerDomain.getReplicationServer()
+              .getMonitorInstanceName() + this + " publishes message:\n" + msg);
+    }
     session.publish(msg);
   }
 
@@ -653,7 +653,7 @@
    */
   public short getProtocolVersion()
   {
-    return protocolVersion;
+    return session.getProtocolVersion();
   }
 
   /**
@@ -950,68 +950,19 @@
   }
 
   /**
-   * Send an InitializeRequestMessage to the server connected through this
-   * handler.
-   *
-   * @param msg The message to be processed
-   * @throws IOException when raised by the underlying session
-   */
-  public void send(RoutableMsg msg) throws IOException
-  {
-    if (debugEnabled())
-      TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + this +
-          " publishes message:\n" + msg);
-
-    // Currently only MonitorMsg has to support a backward compatibility
-    if ((msg instanceof MonitorMsg) || (msg instanceof ErrorMsg) ||
-        (msg instanceof EntryMsg) || (msg instanceof InitializeRequestMsg) ||
-        (msg instanceof InitializeTargetMsg))
-    {
-      session.publish(msg, protocolVersion);
-    } else
-    {
-      session.publish(msg);
-    }
-  }
-
-  /**
-   * Sends an ack message to the server represented by this object.
-   *
-   * @param ack The ack message to be sent.
-   * @throws IOException In case of Exception thrown sending the ack.
-   */
-  public void sendAck(AckMsg ack) throws IOException
-  {
-    session.publish(ack);
-  }
-
-  /**
-   * Send an ErrorMsg to the peer.
-   *
-   * @param errorMsg The message to be sent
-   * @throws IOException when raised by the underlying session
-   */
-  public void sendError(ErrorMsg errorMsg) throws IOException
-  {
-    session.publish(errorMsg);
-  }
-
-  /**
    * Sends the provided TopologyMsg to the peer server.
    *
-   * @param topoMsg The TopologyMsg message to be sent.
-   * @throws IOException When it occurs while sending the message,
-   *
+   * @param topoMsg
+   *          The TopologyMsg message to be sent.
+   * @throws IOException
+   *           When it occurs while sending the message,
    */
-  public void sendTopoInfo(TopologyMsg topoMsg)
-  throws IOException
+  public void sendTopoInfo(TopologyMsg topoMsg) throws IOException
   {
     // V1 Rs do not support the TopologyMsg
-    if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+    if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
     {
-      session.publish(topoMsg, protocolVersion);
+      send(topoMsg);
     }
   }
 

--
Gitblit v1.10.0