From 7ee84fd2ff21c5e25b9a234f974c08b49070fcaf Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 30 Oct 2008 16:30:36 +0000
Subject: [PATCH] Fix for #3543: Replication protocol incompatibility between v1 and v2: cannot upgrade a running replicated topology from v1 to v2
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java | 282 ++++++++++++++++++++++++++++---------------------------
1 files changed, 144 insertions(+), 138 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 7df0c87..78c70fa 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -123,50 +123,80 @@
{
while (true)
{
- ReplicationMsg msg = session.receive();
+ try
+ {
+ ReplicationMsg msg = session.receive();
- /*
- if (debugEnabled())
- {
- TRACER.debugInfo(
- "In RS " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- (handler.isReplicationServer()?" From RS ":" From LS")+
- " with serverId=" + serverId + " receives " + msg);
- }
- */
- if (msg instanceof AckMsg)
- {
- AckMsg ack = (AckMsg) msg;
- handler.checkWindow();
- replicationServerDomain.ack(ack, serverId);
- } else if (msg instanceof UpdateMsg)
- {
- boolean filtered = false;
- /* Ignore updates in some cases */
- if (handler.isLDAPserver())
+ /*
+ if (debugEnabled())
{
- /**
- * Ignore updates from DS in bad BAD_GENID_STATUS or
- * FULL_UPDATE_STATUS
- *
- * The RSD lock should not be taken here as it is acceptable to have
- * a delay between the time the server has a wrong status and the
- * fact we detect it: the updates that succeed to pass during this
- * time will have no impact on remote server. But it is interesting
- * to not saturate uselessly the network if the updates are not
- * necessary so this check to stop sending updates is interesting
- * anyway. Not taking the RSD lock allows to have better
- * performances in normal mode (most of the time).
- */
- ServerStatus dsStatus = handler.getStatus();
- if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
- (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+ TRACER.debugInfo(
+ "In RS " + replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() +
+ (handler.isReplicationServer()?" From RS ":" From LS")+
+ " with serverId=" + serverId + " receives " + msg);
+ }
+ */
+ if (msg instanceof AckMsg)
+ {
+ AckMsg ack = (AckMsg) msg;
+ handler.checkWindow();
+ replicationServerDomain.ack(ack, serverId);
+ } else if (msg instanceof UpdateMsg)
+ {
+ boolean filtered = false;
+ /* Ignore updates in some cases */
+ if (handler.isLDAPserver())
{
+ /**
+ * Ignore updates from DS in bad BAD_GENID_STATUS or
+ * FULL_UPDATE_STATUS
+ *
+ * The RSD lock should not be taken here as it is acceptable to
+ * have a delay between the time the server has a wrong status and
+ * the fact we detect it: the updates that succeed to pass during
+ * this time will have no impact on remote server. But it is
+ * interesting to not saturate uselessly the network if the
+ * updates are not necessary so this check to stop sending updates
+ * is interesting anyway. Not taking the RSD lock allows to have
+ * better performances in normal mode (most of the time).
+ */
+ ServerStatus dsStatus = handler.getStatus();
+ if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
+ (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+ {
+ long referenceGenerationId =
+ replicationServerDomain.getGenerationId();
+ if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+ logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
+ Short.toString(replicationServerDomain.
+ getReplicationServer().getServerId()),
+ replicationServerDomain.getBaseDn().toNormalizedString(),
+ ((UpdateMsg) msg).getChangeNumber().toString(),
+ Short.toString(handler.getServerId()),
+ Long.toString(referenceGenerationId),
+ Long.toString(handler.getGenerationId())));
+ if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+ logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
+ Short.toString(replicationServerDomain.
+ getReplicationServer().getServerId()),
+ replicationServerDomain.getBaseDn().toNormalizedString(),
+ ((UpdateMsg) msg).getChangeNumber().toString(),
+ Short.toString(handler.getServerId())));
+ filtered = true;
+ }
+ } else
+ {
+ /**
+ * Ignore updates from RS with bad gen id
+ * (no system managed status for a RS)
+ */
long referenceGenerationId =
replicationServerDomain.getGenerationId();
- if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
- logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
+ if ((referenceGenerationId > 0) &&
+ (referenceGenerationId != handler.getGenerationId()))
+ {
+ logError(ERR_IGNORING_UPDATE_FROM_RS.get(
Short.toString(replicationServerDomain.getReplicationServer().
getServerId()),
replicationServerDomain.getBaseDn().toNormalizedString(),
@@ -174,103 +204,86 @@
Short.toString(handler.getServerId()),
Long.toString(referenceGenerationId),
Long.toString(handler.getGenerationId())));
- if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
- logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
- Short.toString(replicationServerDomain.getReplicationServer().
- getServerId()),
- replicationServerDomain.getBaseDn().toNormalizedString(),
- ((UpdateMsg) msg).getChangeNumber().toString(),
- Short.toString(handler.getServerId())));
- filtered = true;
+ filtered = true;
+ }
}
- } else
- {
- /**
- * Ignore updates from RS with bad gen id
- * (no system managed status for a RS)
- */
- long referenceGenerationId =
- replicationServerDomain.getGenerationId();
- if ((referenceGenerationId > 0) &&
- (referenceGenerationId != handler.getGenerationId()))
- {
- logError(ERR_IGNORING_UPDATE_FROM_RS.get(
- Short.toString(replicationServerDomain.getReplicationServer().
- getServerId()),
- replicationServerDomain.getBaseDn().toNormalizedString(),
- ((UpdateMsg) msg).getChangeNumber().toString(),
- Short.toString(handler.getServerId()),
- Long.toString(referenceGenerationId),
- Long.toString(handler.getGenerationId())));
- filtered = true;
- }
- }
- if (!filtered)
+ if (!filtered)
+ {
+ UpdateMsg update = (UpdateMsg) msg;
+ handler.decAndCheckWindow();
+ replicationServerDomain.put(update, handler);
+ }
+ } else if (msg instanceof WindowMsg)
{
- UpdateMsg update = (UpdateMsg) msg;
- handler.decAndCheckWindow();
- replicationServerDomain.put(update, handler);
+ WindowMsg windowMsg = (WindowMsg) msg;
+ handler.updateWindow(windowMsg);
+ } else if (msg instanceof InitializeRequestMsg)
+ {
+ InitializeRequestMsg initializeMsg =
+ (InitializeRequestMsg) msg;
+ handler.process(initializeMsg);
+ } else if (msg instanceof InitializeTargetMsg)
+ {
+ InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
+ handler.process(initializeMsg);
+ } else if (msg instanceof EntryMsg)
+ {
+ EntryMsg entryMsg = (EntryMsg) msg;
+ handler.process(entryMsg);
+ } else if (msg instanceof DoneMsg)
+ {
+ DoneMsg doneMsg = (DoneMsg) msg;
+ handler.process(doneMsg);
+ } else if (msg instanceof ErrorMsg)
+ {
+ ErrorMsg errorMsg = (ErrorMsg) msg;
+ handler.process(errorMsg);
+ } else if (msg instanceof ResetGenerationIdMsg)
+ {
+ ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
+ replicationServerDomain.resetGenerationId(handler, genIdMsg);
+ } else if (msg instanceof WindowProbeMsg)
+ {
+ WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
+ handler.process(windowProbeMsg);
+ } else if (msg instanceof TopologyMsg)
+ {
+ TopologyMsg topoMsg = (TopologyMsg) msg;
+ replicationServerDomain.receiveTopoInfoFromRS(topoMsg,
+ handler, true);
+ } else if (msg instanceof ChangeStatusMsg)
+ {
+ ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
+ replicationServerDomain.processNewStatus(handler, csMsg);
+ } else if (msg instanceof MonitorRequestMsg)
+ {
+ MonitorRequestMsg replServerMonitorRequestMsg =
+ (MonitorRequestMsg) msg;
+ handler.process(replServerMonitorRequestMsg);
+ } else if (msg instanceof MonitorMsg)
+ {
+ MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
+ handler.process(replServerMonitorMsg);
+ } else if (msg == null)
+ {
+ /*
+ * The remote server has sent an unknown message,
+ * close the conenction.
+ */
+ Message message = NOTE_READER_NULL_MSG.get(handler.toString());
+ logError(message);
+ return;
}
- } else if (msg instanceof WindowMsg)
+ } catch (NotSupportedOldVersionPDUException e)
{
- WindowMsg windowMsg = (WindowMsg) msg;
- handler.updateWindow(windowMsg);
- } else if (msg instanceof InitializeRequestMsg)
- {
- InitializeRequestMsg initializeMsg =
- (InitializeRequestMsg) msg;
- handler.process(initializeMsg);
- } else if (msg instanceof InitializeTargetMsg)
- {
- InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
- handler.process(initializeMsg);
- } else if (msg instanceof EntryMsg)
- {
- EntryMsg entryMsg = (EntryMsg) msg;
- handler.process(entryMsg);
- } else if (msg instanceof DoneMsg)
- {
- DoneMsg doneMsg = (DoneMsg) msg;
- handler.process(doneMsg);
- } else if (msg instanceof ErrorMsg)
- {
- ErrorMsg errorMsg = (ErrorMsg) msg;
- handler.process(errorMsg);
- } else if (msg instanceof ResetGenerationIdMsg)
- {
- ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
- replicationServerDomain.resetGenerationId(handler, genIdMsg);
- } else if (msg instanceof WindowProbeMsg)
- {
- WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
- handler.process(windowProbeMsg);
- } else if (msg instanceof TopologyMsg)
- {
- TopologyMsg topoMsg = (TopologyMsg) msg;
- replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true);
- } else if (msg instanceof ChangeStatusMsg)
- {
- ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
- replicationServerDomain.processNewStatus(handler, csMsg);
- } else if (msg instanceof MonitorRequestMsg)
- {
- MonitorRequestMsg replServerMonitorRequestMsg =
- (MonitorRequestMsg) msg;
- handler.process(replServerMonitorRequestMsg);
- } else if (msg instanceof MonitorMsg)
- {
- MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
- handler.process(replServerMonitorMsg);
- } else if (msg == null)
- {
- /*
- * The remote server has sent an unknown message,
- * close the conenction.
- */
- Message message = NOTE_READER_NULL_MSG.get(handler.toString());
- logError(message);
- return;
+ // Received a V1 PDU we do not need to support:
+ // we just trash the message and log the event for debug purpose,
+ // then continue receiving messages.
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationServerDomain.
+ getReplicationServer().
+ getMonitorInstanceName() + ":" + e.getMessage());
}
}
} catch (IOException e)
@@ -304,13 +317,6 @@
*/
Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
logError(message);
- } catch (NotSupportedOldVersionPDUException e)
- {
- // Received a V1 PDU we do not need to support:
- // we just trash the message and log the event for debug purpose
- if (debugEnabled())
- TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ":" + e.getMessage());
} catch (Exception e)
{
if (debugEnabled())
--
Gitblit v1.10.0