From ed847e95ab009b3f8a7b57636aa3bbe977bf875d Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 19 Oct 2009 07:56:29 +0000
Subject: [PATCH] Fix #4270 ECL Should not establish connections between RSes
---
opends/src/server/org/opends/server/replication/server/ServerReader.java | 88 ++++++++++++++-----------------------------
1 files changed, 29 insertions(+), 59 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 4b5037d..934fe4d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -60,7 +60,6 @@
private int serverId;
private ProtocolSession session;
private ServerHandler handler;
- private ReplicationServerDomain replicationServerDomain;
/**
* Constructor for the LDAP server reader part of the replicationServer.
@@ -68,20 +67,15 @@
* @param session The ProtocolSession from which to read the data.
* @param serverId The server ID of the server from which we read messages.
* @param handler The server handler for this server reader.
- * @param replicationServerDomain The ReplicationServerDomain for this server
- * reader.
*/
public ServerReader(ProtocolSession session, int serverId,
- ServerHandler handler,
- ReplicationServerDomain replicationServerDomain)
+ ServerHandler handler)
{
- super("Replication Reader Thread for handler of " +
- handler.toString() +
- " in " + replicationServerDomain);
+ super("Replication Reader Thread for RS handler " +
+ handler.getMonitorInstanceName());
this.session = session;
this.serverId = serverId;
this.handler = handler;
- this.replicationServerDomain = replicationServerDomain;
}
/**
@@ -109,15 +103,14 @@
if (debugEnabled())
{
- TRACER.debugInfo("In " + replicationServerDomain + " " +
- getName() + " receives " + msg);
+ TRACER.debugInfo("In " + getName() + " receives " + msg);
}
if (msg instanceof AckMsg)
{
AckMsg ack = (AckMsg) msg;
handler.checkWindow();
- replicationServerDomain.processAck(ack, handler);
+ handler.processAck(ack);
} else if (msg instanceof UpdateMsg)
{
boolean filtered = false;
@@ -141,22 +134,19 @@
if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
(dsStatus == ServerStatus.FULL_UPDATE_STATUS))
{
- long referenceGenerationId =
- replicationServerDomain.getGenerationId();
+ long referenceGenerationId = handler.getReferenceGenId();
if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
- Integer.toString(replicationServerDomain.
- getReplicationServer().getServerId()),
- replicationServerDomain.getBaseDn(),
+ Integer.toString(handler.getReplicationServerId()),
+ handler.getServiceId(),
((UpdateMsg) msg).getChangeNumber().toString(),
Integer.toString(handler.getServerId()),
Long.toString(referenceGenerationId),
Long.toString(handler.getGenerationId())));
if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
- Integer.toString(replicationServerDomain.
- getReplicationServer().getServerId()),
- replicationServerDomain.getBaseDn(),
+ Integer.toString(handler.getReplicationServerId()),
+ handler.getServiceId(),
((UpdateMsg) msg).getChangeNumber().toString(),
Integer.toString(handler.getServerId())));
filtered = true;
@@ -167,17 +157,15 @@
* Ignore updates from RS with bad gen id
* (no system managed status for a RS)
*/
- long referenceGenerationId =
- replicationServerDomain.getGenerationId();
+ long referenceGenerationId =handler.getReferenceGenId();
if ((referenceGenerationId > 0) &&
(referenceGenerationId != handler.getGenerationId()))
{
logError(
ERR_IGNORING_UPDATE_FROM_RS.get(
Integer.toString(
- replicationServerDomain.getReplicationServer().
- getServerId()),
- replicationServerDomain.getBaseDn(),
+ handler.getReplicationServerId()),
+ handler.getServiceId(),
((UpdateMsg) msg).getChangeNumber().toString(),
Integer.toString(handler.getServerId()),
Long.toString(referenceGenerationId),
@@ -190,7 +178,7 @@
{
UpdateMsg update = (UpdateMsg) msg;
handler.decAndCheckWindow();
- replicationServerDomain.put(update, handler);
+ handler.put(update);
}
} else if (msg instanceof WindowMsg)
{
@@ -220,7 +208,7 @@
} else if (msg instanceof ResetGenerationIdMsg)
{
ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
- replicationServerDomain.resetGenerationId(handler, genIdMsg);
+ handler.processResetGenId(genIdMsg);
} else if (msg instanceof WindowProbeMsg)
{
WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
@@ -231,8 +219,7 @@
try
{
ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
- replicationServerDomain.receiveTopoInfoFromRS(topoMsg,
- rsh, true);
+ rsh.receiveTopoInfoFromRS(topoMsg);
}
catch(Exception e)
{
@@ -247,13 +234,13 @@
try
{
DataServerHandler dsh = (DataServerHandler)handler;
- replicationServerDomain.processNewStatus(dsh, csMsg);
+ dsh.receiveNewStatus(csMsg);
}
catch(Exception e)
{
errMessage =
ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(
- replicationServerDomain.getBaseDn(),
+ handler.getServiceId(),
Integer.toString(handler.getServerId()),
csMsg.toString());
logError(errMessage);
@@ -270,8 +257,7 @@
} else if (msg instanceof ChangeTimeHeartbeatMsg)
{
ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
- replicationServerDomain.processChangeTimeHeartbeatMsg(handler,
- cthbMsg);
+ handler.process(cthbMsg);
} else if (msg instanceof StopMsg)
{
// Peer server is properly disconnecting: go out of here to
@@ -280,8 +266,7 @@
{
TRACER.debugInfo(handler.toString() + " has properly " +
"disconnected from this replication server " +
- Integer.toString(replicationServerDomain.getReplicationServer().
- getServerId()));
+ Integer.toString(handler.getReplicationServerId()));
}
return;
} else if (msg == null)
@@ -300,9 +285,8 @@
// we just trash the message and log the event for debug purpose,
// then continue receiving messages.
if (debugEnabled())
- TRACER.debugInfo("In " + replicationServerDomain.
- getReplicationServer().
- getMonitorInstanceName() + ":" + e.getMessage());
+ TRACER.debugInfo(
+ "In " + this.getName() + " " + stackTraceToSingleLineString(e));
}
}
}
@@ -315,24 +299,16 @@
*/
if (debugEnabled())
TRACER.debugInfo(
- "In RS " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- " reader IO EXCEPTION for serverID=" + serverId + " " +
- this + " " +
- stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
+ "In " + this.getName() + " " + stackTraceToSingleLineString(e));
errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
- Integer.toString(replicationServerDomain.
- getReplicationServer().getServerId()));
+ Integer.toString(handler.getReplicationServerId()));
logError(errMessage);
}
catch (ClassNotFoundException e)
{
if (debugEnabled())
TRACER.debugInfo(
- "In RS <" + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- " reader CNF EXCEPTION serverID=" + serverId +
- stackTraceToSingleLineString(e));
+ "In " + this.getName() + " " + stackTraceToSingleLineString(e));
/*
* The remote server has sent an unknown message,
* close the connection.
@@ -344,10 +320,7 @@
{
if (debugEnabled())
TRACER.debugInfo(
- "In RS <" + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- " server reader EXCEPTION serverID=" + serverId +
- " " + stackTraceToSingleLineString(e));
+ "In " + this.getName() + " " + stackTraceToSingleLineString(e));
/*
* The remote server has sent an unknown message,
* close the connection.
@@ -364,11 +337,6 @@
*/
try
{
- if (debugEnabled())
- TRACER.debugInfo(
- "In RS " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- this + " is closing the session");
if (handler.getProtocolVersion() >=
ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
@@ -382,12 +350,14 @@
// Anyway, going to close session, so nothing to do
}
}
+ if (debugEnabled())
+ TRACER.debugInfo("In " + this.getName() + " closing the session");
session.close();
} catch (IOException e)
{
// ignore
}
- replicationServerDomain.stopServer(handler);
+ handler.doStop();
if (debugEnabled())
{
TRACER.debugInfo(this.getName() + " stopped " + errMessage);
--
Gitblit v1.10.0