From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication
---
opends/src/server/org/opends/server/replication/server/ServerReader.java | 151 ++++++++++++++++++++++++++++++++++++++++++--------
1 files changed, 127 insertions(+), 24 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index e61446f..cb66bd2 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -30,6 +30,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
@@ -39,6 +40,7 @@
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
import org.opends.server.replication.protocol.ErrorMessage;
+import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.InitializeTargetMessage;
import org.opends.server.replication.protocol.ProtocolSession;
@@ -76,7 +78,7 @@
* Constructor for the LDAP server reader part of the replicationServer.
*
* @param session The ProtocolSession from which to read the data.
- * @param serverId The server ID of the server from which we read changes.
+ * @param serverId The server ID of the server from which we read messages.
* @param handler The server handler for this server reader.
* @param replicationCache The ReplicationCache for this server reader.
*/
@@ -97,14 +99,11 @@
{
if (debugEnabled())
{
- if (handler.isReplicationServer())
- {
- TRACER.debugInfo("Replication server reader starting " + serverId);
- }
- else
- {
- TRACER.debugInfo("LDAP server reader starting " + serverId);
- }
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ (handler.isReplicationServer()?" RS ":" LS")+
+ " reader starting for serverId=" + serverId);
}
/*
* wait on input stream
@@ -116,6 +115,25 @@
{
ReplicationMessage msg = session.receive();
+ if (debugEnabled())
+ {
+ if (handler.isReplicationServer())
+ {
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ "> from RS server with serverId=" + serverId +
+ " receives " + msg);
+ }
+ else
+ {
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ "> from LDAP server with serverId=" + serverId +
+ " receives " + msg);
+ }
+ }
if (msg instanceof AckMessage)
{
AckMessage ack = (AckMessage) msg;
@@ -124,9 +142,22 @@
}
else if (msg instanceof UpdateMessage)
{
- UpdateMessage update = (UpdateMessage) msg;
- handler.decAndCheckWindow();
- replicationCache.put(update, handler);
+ // Ignore update received from a replica with
+ // a bad generation ID
+ long referenceGenerationId = replicationCache.getGenerationId();
+ if ((referenceGenerationId>0) &&
+ (referenceGenerationId != handler.getGenerationId()))
+ {
+ logError(ERR_IGNORING_UPDATE_FROM.get(
+ msg.toString(),
+ handler.getMonitorInstanceName()));
+ }
+ else
+ {
+ UpdateMessage update = (UpdateMessage) msg;
+ handler.decAndCheckWindow();
+ replicationCache.put(update, handler);
+ }
}
else if (msg instanceof WindowMessage)
{
@@ -159,6 +190,11 @@
ErrorMessage errorMsg = (ErrorMessage) msg;
handler.process(errorMsg);
}
+ else if (msg instanceof ResetGenerationId)
+ {
+ ResetGenerationId genIdMsg = (ResetGenerationId) msg;
+ replicationCache.resetGenerationId(this.handler);
+ }
else if (msg instanceof WindowProbe)
{
WindowProbe windowProbeMsg = (WindowProbe) msg;
@@ -168,6 +204,52 @@
{
ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
handler.setReplServerInfo(infoMsg);
+
+ if (debugEnabled())
+ {
+ if (handler.isReplicationServer())
+ TRACER.debugInfo(
+ "In RS " + replicationCache.getReplicationServer().
+ getServerId() +
+ " Receiving replServerInfo from " + handler.getServerId() +
+ " baseDn=" + replicationCache.getBaseDn() +
+ " genId=" + infoMsg.getGenerationId());
+ }
+
+ if (replicationCache.getGenerationId()<0)
+ {
+ // Here is the case where a ReplicationServer receives from
+ // another ReplicationServer the generationId for a domain
+ // for which the generation ID has never been set.
+ replicationCache.setGenerationId(infoMsg.getGenerationId(), false);
+ }
+ else
+ {
+ if (infoMsg.getGenerationId()<0)
+ {
+ // Here is the case where another ReplicationServer
+ // signals that it has no generationId set for the domain.
+ // If we have generationId set locally and no server currently
+ // connected for that domain in the topology then we may also
+ // reset the generationId localy.
+ replicationCache.mayResetGenerationId();
+ }
+
+ if (replicationCache.getGenerationId() != infoMsg.getGenerationId())
+ {
+ Message message = NOTE_BAD_GENERATION_ID.get(
+ replicationCache.getBaseDn().toNormalizedString(),
+ Short.toString(handler.getServerId()),
+ Long.toString(infoMsg.getGenerationId()),
+ Long.toString(replicationCache.getGenerationId()));
+
+ ErrorMessage errorMsg = new ErrorMessage(
+ replicationCache.getReplicationServer().getServerId(),
+ handler.getServerId(),
+ message);
+ session.publish(errorMsg);
+ }
+ }
}
else if (msg == null)
{
@@ -187,21 +269,40 @@
* Log a message and exit from this loop
* So that this handler is stopped.
*/
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " reader IO EXCEPTION serverID=" + serverId
+ + stackTraceToSingleLineString(e) + e.getLocalizedMessage() +
+ e.getCause());
Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
logError(message);
} catch (ClassNotFoundException e)
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " reader CNF EXCEPTION serverID=" + serverId
+ + stackTraceToSingleLineString(e));
/*
* The remote server has sent an unknown message,
- * close the conenction.
+ * close the connection.
*/
Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
logError(message);
} catch (Exception e)
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " server reader EXCEPTION serverID=" + serverId
+ + stackTraceToSingleLineString(e));
/*
* The remote server has sent an unknown message,
- * close the conenction.
+ * close the connection.
*/
Message message = NOTE_READER_EXCEPTION.get(handler.toString());
logError(message);
@@ -213,6 +314,12 @@
* happen.
* Attempt to close the socket and stop the server handler.
*/
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " reader CLOSE serverID=" + serverId
+ + stackTraceToSingleLineString(new Exception()));
try
{
session.close();
@@ -223,15 +330,11 @@
replicationCache.stopServer(handler);
}
if (debugEnabled())
- {
- if (handler.isReplicationServer())
- {
- TRACER.debugInfo("Replication server reader stopping " + serverId);
- }
- else
- {
- TRACER.debugInfo("LDAP server reader stopping " + serverId);
- }
- }
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ (handler.isReplicationServer()?"RS":"LDAP") +
+ " server reader stopped for serverID=" + serverId
+ + stackTraceToSingleLineString(new Exception()));
}
}
--
Gitblit v1.10.0