From b45a7bf251b59ef156cfd7f3235384ac8835fcd4 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 24 May 2007 13:14:06 +0000
Subject: [PATCH] [Issue 1085] Synchronization protocol must be extensible
---
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 27 ++++++++++++++++++++++++---
1 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 9a34d92..a098c4b 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -49,6 +49,7 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.HeartbeatThread;
@@ -118,6 +119,8 @@
private int saturationCount = 0;
private short replicationServerId;
+ private short protocolVersion;
+
/**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
@@ -146,6 +149,7 @@
super("Server Handler");
this.session = session;
this.maxQueueSize = queueSize;
+ this.protocolVersion = ProtocolVersion.currentVersion();
}
/**
@@ -175,20 +179,27 @@
{
if (baseDn != null)
{
+ // This is an outgoing connection. Publish our start message.
this.baseDn = baseDn;
replicationCache = replicationServer.getReplicationCache(baseDn);
ServerState localServerState = replicationCache.getDbServerState();
ReplServerStartMessage msg =
new ReplServerStartMessage(replicationServerId, replicationServerURL,
- baseDn, windowSize, localServerState);
+ baseDn, windowSize, localServerState,
+ protocolVersion);
session.publish(msg);
}
+ // Wait and process ServerStart or ReplServerStart
ReplicationMessage msg = session.receive();
if (msg instanceof ServerStartMessage)
{
+ // The remote server is an LDAP Server
ServerStartMessage receivedMsg = (ServerStartMessage) msg;
+
+ protocolVersion = ProtocolVersion.minWithCurrent(
+ receivedMsg.getVersion());
serverId = receivedMsg.getServerId();
serverURL = receivedMsg.getServerURL();
this.baseDn = receivedMsg.getBaseDn();
@@ -233,17 +244,22 @@
serverIsLDAPserver = true;
+ // This an incoming connection. Publish our start message
replicationCache = replicationServer.getReplicationCache(this.baseDn);
ServerState localServerState = replicationCache.getDbServerState();
ReplServerStartMessage myStartMsg =
new ReplServerStartMessage(replicationServerId, replicationServerURL,
- this.baseDn, windowSize, localServerState);
+ this.baseDn, windowSize, localServerState,
+ protocolVersion);
session.publish(myStartMsg);
sendWindowSize = receivedMsg.getWindowSize();
}
else if (msg instanceof ReplServerStartMessage)
{
+ // The remote server is a replication server
ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
+ protocolVersion = ProtocolVersion.minWithCurrent(
+ receivedMsg.getVersion());
serverId = receivedMsg.getServerId();
serverURL = receivedMsg.getServerURL();
int separator = serverURL.lastIndexOf(':');
@@ -255,14 +271,19 @@
{
replicationCache = replicationServer.getReplicationCache(this.baseDn);
ServerState serverState = replicationCache.getDbServerState();
+
+ // Publish our start message
ReplServerStartMessage outMsg =
new ReplServerStartMessage(replicationServerId,
replicationServerURL,
- this.baseDn, windowSize, serverState);
+ this.baseDn, windowSize, serverState,
+ protocolVersion);
session.publish(outMsg);
}
else
+ {
this.baseDn = baseDn;
+ }
this.serverState = receivedMsg.getServerState();
sendWindowSize = receivedMsg.getWindowSize();
}
--
Gitblit v1.10.0