From b45a7bf251b59ef156cfd7f3235384ac8835fcd4 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 24 May 2007 13:14:06 +0000
Subject: [PATCH] [Issue 1085]  Synchronization protocol must be extensible

---
 opends/src/server/org/opends/server/replication/server/ServerHandler.java |   27 ++++++++++++++++++++++++---
 1 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 9a34d92..a098c4b 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -49,6 +49,7 @@
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.AckMessage;
 import org.opends.server.replication.protocol.ReplServerStartMessage;
 import org.opends.server.replication.protocol.HeartbeatThread;
@@ -118,6 +119,8 @@
   private int saturationCount = 0;
   private short replicationServerId;
 
+  private short protocolVersion;
+
   /**
    * The time in milliseconds between heartbeats from the replication
    * server.  Zero means heartbeats are off.
@@ -146,6 +149,7 @@
     super("Server Handler");
     this.session = session;
     this.maxQueueSize = queueSize;
+    this.protocolVersion = ProtocolVersion.currentVersion();
   }
 
   /**
@@ -175,20 +179,27 @@
     {
       if (baseDn != null)
       {
+        // This is an outgoing connection. Publish our start message.
         this.baseDn = baseDn;
         replicationCache = replicationServer.getReplicationCache(baseDn);
         ServerState localServerState = replicationCache.getDbServerState();
         ReplServerStartMessage msg =
           new ReplServerStartMessage(replicationServerId, replicationServerURL,
-                                    baseDn, windowSize, localServerState);
+                                    baseDn, windowSize, localServerState,
+                                    protocolVersion);
 
         session.publish(msg);
       }
 
+      // Wait and process ServerStart or ReplServerStart
       ReplicationMessage msg = session.receive();
       if (msg instanceof ServerStartMessage)
       {
+        // The remote server is an LDAP Server
         ServerStartMessage receivedMsg = (ServerStartMessage) msg;
+
+        protocolVersion = ProtocolVersion.minWithCurrent(
+            receivedMsg.getVersion());
         serverId = receivedMsg.getServerId();
         serverURL = receivedMsg.getServerURL();
         this.baseDn = receivedMsg.getBaseDn();
@@ -233,17 +244,22 @@
 
         serverIsLDAPserver = true;
 
+        // This an incoming connection. Publish our start message
         replicationCache = replicationServer.getReplicationCache(this.baseDn);
         ServerState localServerState = replicationCache.getDbServerState();
         ReplServerStartMessage myStartMsg =
           new ReplServerStartMessage(replicationServerId, replicationServerURL,
-                                    this.baseDn, windowSize, localServerState);
+                                    this.baseDn, windowSize, localServerState,
+                                    protocolVersion);
         session.publish(myStartMsg);
         sendWindowSize = receivedMsg.getWindowSize();
       }
       else if (msg instanceof ReplServerStartMessage)
       {
+        // The remote server is a replication server
         ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
+        protocolVersion = ProtocolVersion.minWithCurrent(
+            receivedMsg.getVersion());
         serverId = receivedMsg.getServerId();
         serverURL = receivedMsg.getServerURL();
         int separator = serverURL.lastIndexOf(':');
@@ -255,14 +271,19 @@
         {
           replicationCache = replicationServer.getReplicationCache(this.baseDn);
           ServerState serverState = replicationCache.getDbServerState();
+
+          // Publish our start message
           ReplServerStartMessage outMsg =
             new ReplServerStartMessage(replicationServerId,
                                        replicationServerURL,
-                                       this.baseDn, windowSize, serverState);
+                                       this.baseDn, windowSize, serverState,
+                                       protocolVersion);
           session.publish(outMsg);
         }
         else
+        {
           this.baseDn = baseDn;
+        }
         this.serverState = receivedMsg.getServerState();
         sendWindowSize = receivedMsg.getWindowSize();
       }

--
Gitblit v1.10.0