From c02dd7f87e9ba574f06e5cc1eb36ebeb76b9f446 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 08 Oct 2009 16:02:17 +0000
Subject: [PATCH] - Addition of ReplServerStartDSMsg now sent to a DS connecting to a RS  in handshake phase instead of a ReplServerStartMsg. ReplServerStartDSMsg  contains same thing as ReplServerStartMsg but also contains

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java |  108 +++++++++++++++++++++++++++--------------------------
 1 files changed, 55 insertions(+), 53 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index ce95024..489dbae 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,13 +50,14 @@
 import org.opends.server.replication.protocol.AckMsg;
 import org.opends.server.replication.protocol.ErrorMsg;
 import org.opends.server.replication.protocol.HeartbeatThread;
+import org.opends.server.replication.protocol.MonitorMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplServerStartMsg;
 import org.opends.server.replication.protocol.RoutableMsg;
 import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.replication.protocol.StartMsg;
 import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
 import org.opends.server.replication.protocol.TopologyMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.protocol.WindowMsg;
@@ -94,17 +95,21 @@
     if (providedMsg != null)
     {
       if (debugEnabled())
-        TRACER.debugInfo("In "
-            + ((handler!=null)?handler.toString():"Replication Server")
-            + " closing session with err=" +
-            providedMsg.toString());
+        TRACER.debugInfo("In " +
+          ((handler != null) ? handler.toString() : "Replication Server") +
+          " closing session with err=" +
+          providedMsg.toString());
       logError(providedMsg);
     }
     try
     {
-      if (providedSession!=null)
+      if (providedSession != null)
+        // This method is only called when aborting a failing handshake and
+        // not StopMsg should be sent in such situation. StopMsg are only
+        // expected when full handshake has been performed, or at end of
+        // handshake phase 1, when DS was just gathering available RS info
         providedSession.close();
-    } catch (IOException ee)
+    } catch (IOException e)
     {
       // ignore
     }
@@ -174,7 +179,10 @@
   private int rcvWindow;
   private int rcvWindowSizeHalf;
 
-  private int maxRcvWindow;
+  /**
+   * The size of the receiving window.
+   */
+  protected int maxRcvWindow;
   /**
    * Semaphore that the writer uses to control the flow to the remote server.
    */
@@ -197,7 +205,7 @@
    */
   protected long localGenerationId = -1;
   /**
-   * The generation id before procesing a new start handshake.
+   * The generation id before processing a new start handshake.
    */
   protected long oldGenerationId = -1;
   /**
@@ -210,7 +218,7 @@
   protected boolean initSslEncryption;
 
   /**
-   * The SSL encryption after the negociation with the peer.
+   * The SSL encryption after the negotiation with the peer.
    */
   protected boolean sslEncryption;
   /**
@@ -275,17 +283,6 @@
     // be disturbed
     if (session!=null)
     {
-      try
-      {
-        session.publish(
-            new ErrorMsg(
-                replicationServerDomain.getReplicationServer().getServerId(),
-                serverId,
-                reason));
-      }
-      catch(Exception e)
-      {
-      }
       closeSession(session, reason, this);
     }
 
@@ -991,7 +988,14 @@
           replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() + this +
           " publishes message:\n" + msg);
-    session.publish(msg);
+    // Currently only MonitorMsg has to support a backward compatibility
+    if (msg instanceof MonitorMsg)
+    {
+      session.publish(msg, protocolVersion);
+    } else
+    {
+      session.publish(msg);
+    }
   }
 
   /**
@@ -1017,35 +1021,6 @@
   }
 
   /**
-   * Send the ReplServerStartMsg to the remote server (RS or DS).
-   * @param requestedProtocolVersion The provided protocol version.
-   * @return The ReplServerStartMsg sent.
-   * @throws IOException When an exception occurs.
-   */
-  public ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
-  throws IOException
-  {
-    this.localGenerationId = replicationServerDomain.getGenerationId();
-    ReplServerStartMsg outReplServerStartMsg
-    = new ReplServerStartMsg(
-        replicationServerId,
-        replicationServerURL,
-        getServiceId(),
-        maxRcvWindow,
-        replicationServerDomain.getDbServerState(),
-        protocolVersion,
-        localGenerationId,
-        sslEncryption,
-        getLocalGroupId(),
-        replicationServerDomain.
-        getReplicationServer().getDegradedStatusThreshold());
-
-    session.publish(outReplServerStartMsg, requestedProtocolVersion);
-
-    return outReplServerStartMsg;
-  }
-
-  /**
    * Sends the provided TopologyMsg to the peer server.
    *
    * @param topoMsg The TopologyMsg message to be sent.
@@ -1058,7 +1033,7 @@
     // V1 Rs do not support the TopologyMsg
     if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
     {
-      session.publish(topoMsg);
+      session.publish(topoMsg, protocolVersion);
     }
   }
 
@@ -1110,6 +1085,18 @@
 
     if (session != null)
     {
+      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // V4 protocol introduces a StopMsg to properly end
+        // communications
+        try
+        {
+          session.publish(new StopMsg());
+        } catch (IOException ioe)
+        {
+          // Anyway, going to close session, so nothing to do
+        }
+      }
       // Close session to end ServerReader or ServerWriter
       try
       {
@@ -1328,12 +1315,27 @@
           replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + " :" +
-          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
+          "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
           "\nAND REPLIED:\n" + outTopoMsg.toString());
     }
   }
 
   /**
+   * Log stop message has been received.
+   */
+  protected void logStopReceived()
+  {
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("In " +
+          replicationServerDomain.getReplicationServer().
+          getMonitorInstanceName() + ", " +
+          this.getClass().getSimpleName() + " " + this + " :" +
+          "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
+    }
+  }
+
+  /**
    * Log the messages involved in the Topology/StartSession handshake.
    * @param inStartECLSessionMsg The message received first.
    */

--
Gitblit v1.10.0