From ed847e95ab009b3f8a7b57636aa3bbe977bf875d Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 19 Oct 2009 07:56:29 +0000
Subject: [PATCH] Fix #4270 ECL Should not establish connections between RSes
---
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 98 ++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 80 insertions(+), 18 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 489dbae..784f01b 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -46,13 +46,15 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
@@ -297,7 +299,8 @@
// replication server domain
if (oldGenerationId != -100)
{
- replicationServerDomain.changeGenerationId(oldGenerationId, false);
+ if (replicationServerDomain!=null)
+ replicationServerDomain.changeGenerationId(oldGenerationId, false);
}
}
@@ -363,8 +366,7 @@
writer = new ServerWriter(session, serverId,
this, replicationServerDomain);
- reader = new ServerReader(session, serverId,
- this, replicationServerDomain);
+ reader = new ServerReader(session, serverId, this);
reader.start();
writer.start();
@@ -947,6 +949,20 @@
}
/**
+ * Processes a change time heartbeat msg.
+ *
+ * @param msg The message to be processed.
+ */
+ public void process(ChangeTimeHeartbeatMsg msg)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + this +
+ " processes received msg:\n" + msg);
+ replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg);
+ }
+
+ /**
* Process the reception of a WindowProbeMsg message.
*
* @param windowProbeMsg The message to process.
@@ -1231,8 +1247,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + ":" +
"\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
"\nAND REPLIED:\n" + outStartMsg.toString());
@@ -1251,8 +1266,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + ":" +
"\nSH START HANDSHAKE SENT("+ this +
"):\n" + outStartMsg.toString()+
@@ -1272,8 +1286,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + ":" +
"\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
"\nAND REPLIED:\n" + outTopoMsg.toString());
@@ -1292,8 +1305,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + ":" +
"\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
"\nAND RECEIVED:\n" + inTopoMsg.toString());
@@ -1312,8 +1324,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + " :" +
"\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
"\nAND REPLIED:\n" + outTopoMsg.toString());
@@ -1328,8 +1339,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + " :" +
"\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
}
@@ -1345,11 +1355,63 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + " :" +
"\nSH SESSION HANDSHAKE RECEIVED:\n" +
inStartECLSessionMsg.toString());
}
}
+
+ /**
+ * Process a Ack message received.
+ * @param ack the message received.
+ */
+ public void processAck(AckMsg ack)
+ {
+ if (replicationServerDomain!=null)
+ replicationServerDomain.processAck(ack, this);
+ }
+
+ /**
+ * Get the reference generation id (associated with the changes in the db).
+ * @return the reference generation id.
+ */
+ public long getReferenceGenId()
+ {
+ long refgenid = -1;
+ if (replicationServerDomain!=null)
+ refgenid = replicationServerDomain.getGenerationId();
+ return refgenid;
+ }
+
+ /**
+ * Process a ResetGenerationIdMsg message received.
+ * @param msg the message received.
+ */
+ public void processResetGenId(ResetGenerationIdMsg msg)
+ {
+ if (replicationServerDomain!=null)
+ replicationServerDomain.resetGenerationId(this, msg);
+ }
+
+ /**
+ * Put a new update message received.
+ * @param update the update message received.
+ * @throws IOException when it occurs.
+ */
+ public void put(UpdateMsg update)
+ throws IOException
+ {
+ if (replicationServerDomain!=null)
+ replicationServerDomain.put(update, this);
+ }
+
+ /**
+ * Stop this handler.
+ */
+ public void doStop()
+ {
+ if (replicationServerDomain!=null)
+ replicationServerDomain.stopServer(this);
+ }
}
--
Gitblit v1.10.0