From ed847e95ab009b3f8a7b57636aa3bbe977bf875d Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 19 Oct 2009 07:56:29 +0000
Subject: [PATCH] Fix #4270 ECL Should not establish connections between RSes

---
 opends/src/server/org/opends/server/replication/server/ServerHandler.java |   98 ++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 80 insertions(+), 18 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 489dbae..784f01b 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -46,13 +46,15 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.ReplicationMsg;
 import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
 import org.opends.server.replication.protocol.ErrorMsg;
 import org.opends.server.replication.protocol.HeartbeatThread;
 import org.opends.server.replication.protocol.MonitorMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
 import org.opends.server.replication.protocol.RoutableMsg;
 import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.replication.protocol.StartMsg;
@@ -297,7 +299,8 @@
     // replication server domain
     if (oldGenerationId != -100)
     {
-      replicationServerDomain.changeGenerationId(oldGenerationId, false);
+      if (replicationServerDomain!=null)
+        replicationServerDomain.changeGenerationId(oldGenerationId, false);
     }
   }
 
@@ -363,8 +366,7 @@
 
       writer = new ServerWriter(session, serverId,
           this, replicationServerDomain);
-      reader = new ServerReader(session, serverId,
-          this, replicationServerDomain);
+      reader = new ServerReader(session, serverId, this);
 
       reader.start();
       writer.start();
@@ -947,6 +949,20 @@
   }
 
   /**
+   * Processes a change time heartbeat msg.
+   *
+   * @param msg The message to be processed.
+   */
+  public void process(ChangeTimeHeartbeatMsg msg)
+  {
+    if (debugEnabled())
+      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
+          getMonitorInstanceName() + this +
+          " processes received msg:\n" + msg);
+    replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg);
+  }
+
+  /**
    * Process the reception of a WindowProbeMsg message.
    *
    * @param  windowProbeMsg The message to process.
@@ -1231,8 +1247,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-        replicationServerDomain.getReplicationServer().
-        getMonitorInstanceName() + ", " +
+        this.replicationServer.getMonitorInstanceName() + ", " +
         this.getClass().getSimpleName() + " " + this + ":" +
         "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
         "\nAND REPLIED:\n" + outStartMsg.toString());
@@ -1251,8 +1266,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-        replicationServerDomain.getReplicationServer().
-        getMonitorInstanceName() + ", " +
+        this.replicationServer.getMonitorInstanceName() + ", " +
         this.getClass().getSimpleName() + " " + this + ":" +
         "\nSH START HANDSHAKE SENT("+ this +
         "):\n" + outStartMsg.toString()+
@@ -1272,8 +1286,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + ", " +
+          this.replicationServer.getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + ":" +
           "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
           "\nAND REPLIED:\n" + outTopoMsg.toString());
@@ -1292,8 +1305,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + ", " +
+          this.replicationServer.getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + ":" +
           "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
           "\nAND RECEIVED:\n" + inTopoMsg.toString());
@@ -1312,8 +1324,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + ", " +
+          this.replicationServer.getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + " :" +
           "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
           "\nAND REPLIED:\n" + outTopoMsg.toString());
@@ -1328,8 +1339,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + ", " +
+          this.replicationServer.getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + " :" +
           "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
     }
@@ -1345,11 +1355,63 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + ", " +
+          this.replicationServer.getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + " :" +
           "\nSH SESSION HANDSHAKE RECEIVED:\n" +
           inStartECLSessionMsg.toString());
     }
   }
+
+  /**
+   * Process a Ack message received.
+   * @param ack the message received.
+   */
+  public void processAck(AckMsg ack)
+  {
+    if (replicationServerDomain!=null)
+      replicationServerDomain.processAck(ack, this);
+  }
+
+  /**
+   * Get the reference generation id (associated with the changes in the db).
+   * @return the reference generation id.
+   */
+  public long getReferenceGenId()
+  {
+    long refgenid = -1;
+    if (replicationServerDomain!=null)
+      refgenid = replicationServerDomain.getGenerationId();
+    return refgenid;
+  }
+
+  /**
+   * Process a ResetGenerationIdMsg message received.
+   * @param msg the message received.
+   */
+  public void processResetGenId(ResetGenerationIdMsg msg)
+  {
+    if (replicationServerDomain!=null)
+      replicationServerDomain.resetGenerationId(this, msg);
+  }
+
+  /**
+   * Put a new update message received.
+   * @param update the update message received.
+   * @throws IOException when it occurs.
+   */
+  public void put(UpdateMsg update)
+  throws IOException
+  {
+    if (replicationServerDomain!=null)
+      replicationServerDomain.put(update, this);
+  }
+
+  /**
+   * Stop this handler.
+   */
+  public void doStop()
+  {
+    if (replicationServerDomain!=null)
+      replicationServerDomain.stopServer(this);
+  }
 }

--
Gitblit v1.10.0