From c02dd7f87e9ba574f06e5cc1eb36ebeb76b9f446 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 08 Oct 2009 16:02:17 +0000
Subject: [PATCH] - Addition of ReplServerStartDSMsg now sent to a DS connecting to a RS  in handshake phase instead of a ReplServerStartMsg. ReplServerStartDSMsg  contains same thing as ReplServerStartMsg but also contains

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java |  120 ++++++++++++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 100 insertions(+), 20 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 3cd91c4..40fe57d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -145,7 +145,21 @@
           try
           {
             if (session != null)
+            {
+              // V4 protocol introduces a StopMsg to properly close the
+              // connection between servers
+              if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+              {
+                try
+                {
+                  session.publish(new StopMsg());
+                } catch (IOException ioe)
+                {
+                  // Anyway, going to close session, so nothing to do
+                }
+              }
               session.close();
+            }
           } catch (IOException e)
           {
             // ignore
@@ -461,7 +475,7 @@
   {
     TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
         this.serverId);
-    session.publish(outTopoMsg);
+    session.publish(outTopoMsg, protocolVersion);
     return outTopoMsg;
   }
   /**
@@ -500,14 +514,13 @@
         return;
       }
 
-      //
-      ReplServerStartMsg outReplServerStartMsg = null;
+      StartMsg outStartMsg = null;
       try
       {
-        outReplServerStartMsg = sendStartToRemote(protocolVersion);
+        outStartMsg = sendStartToRemote(protocolVersion);
 
         // log
-        logStartHandshakeRCVandSND(inServerStartMsg, outReplServerStartMsg);
+        logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
 
         // The session initiator decides whether to use SSL.
         // Until here session is encrypted then it depends on the negotiation
@@ -517,6 +530,13 @@
         // wait and process StartSessionMsg from remote RS
         StartSessionMsg inStartSessionMsg =
           waitAndProcessStartSessionFromRemoteDS();
+        if (inStartSessionMsg == null)
+        {
+          // DS wants to properly close the connection (DS sent a StopMsg)
+          logStopReceived();
+          abortStart(null);
+          return;
+        }
 
         // Send our own TopologyMsg to remote RS
         TopologyMsg outTopoMsg = sendTopoToRemoteDS();
@@ -525,18 +545,12 @@
       }
       catch(IOException e)
       {
-        // We do not want polluting error log if error is due to normal session
-        // aborted after handshake phase one from a DS that is searching for
-        // best suitable RS.
-
-        // don't log a polluting error when connection aborted
-        // from a DS that wanted only to perform handshake phase 1 in order
-        // to determine the best suitable RS:
-        // 1) -> ServerStartMsg
-        // 2) <- ReplServerStartMsg
-        // 3) connection closure
-
-        throw new DirectoryException(ResultCode.OTHER, null, null);
+        Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
+          Integer.toString(inServerStartMsg.getServerId()),
+          Integer.toString(replicationServerDomain.getReplicationServer().
+          getServerId()));
+        logError(errMessage);
+        throw new DirectoryException(ResultCode.OTHER, errMessage);
       }
       catch (NotSupportedOldVersionPDUException e)
       {
@@ -578,6 +592,65 @@
         replicationServerDomain.release();
     }
   }
+
+  /**
+   * Send the ReplServerStartDSMsg to the remote DS.
+   * @param requestedProtocolVersion The provided protocol version.
+   * @return The StartMsg sent.
+   * @throws IOException When an exception occurs.
+   */
+  private StartMsg sendStartToRemote(short requestedProtocolVersion)
+  throws IOException
+  {
+    // Before V4 protocol, we sent a ReplServerStartMsg
+    if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
+    {
+
+      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
+      ReplServerStartMsg outReplServerStartMsg
+      = new ReplServerStartMsg(
+          replicationServerId,
+          replicationServerURL,
+          getServiceId(),
+          maxRcvWindow,
+          replicationServerDomain.getDbServerState(),
+          protocolVersion,
+          localGenerationId,
+          sslEncryption,
+          getLocalGroupId(),
+          replicationServerDomain.
+          getReplicationServer().getDegradedStatusThreshold());
+
+      session.publish(outReplServerStartMsg, requestedProtocolVersion);
+
+      return outReplServerStartMsg;
+    }
+    else
+    {
+      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
+      ReplServerStartDSMsg outReplServerStartDSMsg
+      = new ReplServerStartDSMsg(
+          replicationServerId,
+          replicationServerURL,
+          getServiceId(),
+          maxRcvWindow,
+          replicationServerDomain.getDbServerState(),
+          protocolVersion,
+          localGenerationId,
+          sslEncryption,
+          getLocalGroupId(),
+          replicationServerDomain.
+          getReplicationServer().getDegradedStatusThreshold(),
+          replicationServer.getWeight(),
+          replicationServerDomain.getConnectedLDAPservers().size());
+
+
+      session.publish(outReplServerStartDSMsg);
+
+      return outReplServerStartDSMsg;
+    }
+  }
+
   /**
    * Creates a DSInfo structure representing this remote DS.
    * @return The DSInfo structure representing this remote DS
@@ -609,8 +682,10 @@
   }
 
   /**
-   * Wait receiving the StartSessionMsg from the remote DS and process it.
-   * @return the startSessionMsg received
+   * Wait receiving the StartSessionMsg from the remote DS and process it, or
+   * receiving a StopMsg to properly stop the handshake procedure.
+   * @return the startSessionMsg received or null DS sent a stop message to
+   *         not finish the handshake.
    * @throws DirectoryException
    * @throws IOException
    * @throws ClassNotFoundException
@@ -625,7 +700,12 @@
     ReplicationMsg msg = null;
     msg = session.receive();
 
-    if (!(msg instanceof StartSessionMsg))
+    if (msg instanceof StopMsg)
+    {
+      // DS wants to stop handshake (was just for handshake phase one for RS
+      // choice). Return null to make the session be terminated.
+      return null;
+    } else if (!(msg instanceof StartSessionMsg))
     {
       Message message = Message.raw(
           "Protocol error: StartSessionMsg required." + msg + " received.");

--
Gitblit v1.10.0