From 2246119784cdfb6f882eba79ed96d2dd9f56f8f9 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 30 Oct 2008 16:30:36 +0000
Subject: [PATCH] Fix for #3543: Replication protocol incompatibility between v1 and v2: cannot upgrade a running replicated topology from v1 to v2
---
opends/src/server/org/opends/server/replication/server/ServerWriter.java | 9 -
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java | 53 ------
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java | 6
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 93 +++++++++++
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java | 5
opends/src/server/org/opends/server/replication/server/ServerReader.java | 282 ++++++++++++++++++-----------------
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java | 6
opends/src/server/org/opends/server/replication/server/ReplicationData.java | 2
8 files changed, 244 insertions(+), 212 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
index d2e244a..47a33e5 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -350,15 +350,13 @@
public byte[] getBytes(short reqProtocolVersion)
throws UnsupportedEncodingException
{
-
- // Using current protocol version should normally not be done as we would
- // normally call the getBytes() method instead for that. So this check
- // for security
+ // Of course, always support current protocol version
if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
{
return getBytes();
}
+ // Supported older protocol versions
switch (reqProtocolVersion)
{
case ProtocolVersion.REPLICATION_PROTOCOL_V1:
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index a8d347e..5a0decd 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -101,10 +101,7 @@
public byte[] getBytes(short reqProtocolVersion)
throws UnsupportedEncodingException
{
-
- // Using current protocol version should normally not be done as we would
- // normally call the getBytes() method instead for that. So this check
- // for security
+ // Of course, always support current protocol version
if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
{
return getBytes();
diff --git a/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
index f74ff7a..a2ce835 100644
--- a/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -348,15 +348,13 @@
public byte[] getBytes(short reqProtocolVersion)
throws UnsupportedEncodingException
{
-
- // Using current protocol version should normally not be done as we would
- // normally call the getBytes() method instead for that. So this check
- // for security
+ // Of course, always support current protocol version
if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
{
return getBytes();
}
+ // Supported older protocol versions
switch (reqProtocolVersion)
{
case ProtocolVersion.REPLICATION_PROTOCOL_V1:
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationData.java b/opends/src/server/org/opends/server/replication/server/ReplicationData.java
index 455a246..d3bf417 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationData.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -50,6 +50,8 @@
public ReplicationData(UpdateMsg change)
throws UnsupportedEncodingException
{
+ // Always keep messages in the replication DB with the current protocol
+ // version
this.setData(change.getBytes());
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a8ac0c2..238a243 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -728,7 +728,7 @@
// if the 2 RS have different generationID
if (replicationServerDomain.getGenerationIdSavedStatus())
{
- // it the present RS has received changes regarding its
+ // if the present RS has received changes regarding its
// gen ID and so won't change without a reset
// then we are just degrading the peer.
Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
@@ -925,6 +925,97 @@
replicationServerDomain.release();
return;
}
+ } else
+ {
+ // Terminate connection from a V1 RS
+
+ // if the remote RS and the local RS have the same genID
+ // then it's ok and nothing else to do
+ if (generationId == localGenerationId)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + " RS V1 with serverID=" + serverId +
+ " is connected with the right generation ID");
+ }
+ } else
+ {
+ if (localGenerationId > 0)
+ {
+ // if the local RS is initialized
+ if (generationId > 0)
+ {
+ // if the remote RS is initialized
+ if (generationId != localGenerationId)
+ {
+ // if the 2 RS have different generationID
+ if (replicationServerDomain.getGenerationIdSavedStatus())
+ {
+ // if the present RS has received changes regarding its
+ // gen ID and so won't change without a reset
+ // then we are just degrading the peer.
+ Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
+ this.baseDn.toNormalizedString(),
+ Short.toString(serverId),
+ Long.toString(generationId),
+ Long.toString(localGenerationId));
+ logError(message);
+ } else
+ {
+ // The present RS has never received changes regarding its
+ // gen ID.
+ //
+ // Example case:
+ // - we are in RS1
+ // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
+ // - RS1 has genId1 from LS1 /genId1 comes from data in
+ // suffix
+ // - we are in RS1 and we receive a START msg from RS2
+ // - Each RS keeps its genID / is degraded and when LS2
+ // will be populated from LS1 everything will become ok.
+ //
+ // Issue:
+ // FIXME : Would it be a good idea in some cases to just
+ // set the gen ID received from the peer RS
+ // specially if the peer has a non null state and
+ // we have a nul state ?
+ // replicationServerDomain.
+ // setGenerationId(generationId, false);
+ Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
+ this.baseDn.toNormalizedString(),
+ Short.toString(serverId),
+ Long.toString(generationId),
+ Long.toString(localGenerationId));
+ logError(message);
+ }
+ }
+ } else
+ {
+ // The remote RS has no genId. We don't change anything for the
+ // current RS.
+ }
+ } else
+ {
+ // The local RS is not initialized - take the one received
+ oldGenerationId =
+ replicationServerDomain.setGenerationId(generationId, false);
+ }
+ }
+
+ // Alright, connected with new incoming V1 RS: store handler.
+ Map<Short, ServerHandler> connectedRSs =
+ replicationServerDomain.getConnectedRSs();
+ connectedRSs.put(serverId, this);
+
+ // Note: the supported scenario for V1->V2 upgrade is to upgrade 1 by 1
+ // all the servers of the topology. We prefer not not send a TopologyMsg
+ // for giving partial/false information to the V2 servers as for
+ // instance we don't have the connected DS of the V1 RS...When the V1
+ // RS will be upgraded in his turn, topo info will be sent and accurate.
+ // That way, there is no risk to have false/incomplete information in
+ // other servers.
}
/*
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 7df0c87..78c70fa 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -123,50 +123,80 @@
{
while (true)
{
- ReplicationMsg msg = session.receive();
+ try
+ {
+ ReplicationMsg msg = session.receive();
- /*
- if (debugEnabled())
- {
- TRACER.debugInfo(
- "In RS " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- (handler.isReplicationServer()?" From RS ":" From LS")+
- " with serverId=" + serverId + " receives " + msg);
- }
- */
- if (msg instanceof AckMsg)
- {
- AckMsg ack = (AckMsg) msg;
- handler.checkWindow();
- replicationServerDomain.ack(ack, serverId);
- } else if (msg instanceof UpdateMsg)
- {
- boolean filtered = false;
- /* Ignore updates in some cases */
- if (handler.isLDAPserver())
+ /*
+ if (debugEnabled())
{
- /**
- * Ignore updates from DS in bad BAD_GENID_STATUS or
- * FULL_UPDATE_STATUS
- *
- * The RSD lock should not be taken here as it is acceptable to have
- * a delay between the time the server has a wrong status and the
- * fact we detect it: the updates that succeed to pass during this
- * time will have no impact on remote server. But it is interesting
- * to not saturate uselessly the network if the updates are not
- * necessary so this check to stop sending updates is interesting
- * anyway. Not taking the RSD lock allows to have better
- * performances in normal mode (most of the time).
- */
- ServerStatus dsStatus = handler.getStatus();
- if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
- (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+ TRACER.debugInfo(
+ "In RS " + replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() +
+ (handler.isReplicationServer()?" From RS ":" From LS")+
+ " with serverId=" + serverId + " receives " + msg);
+ }
+ */
+ if (msg instanceof AckMsg)
+ {
+ AckMsg ack = (AckMsg) msg;
+ handler.checkWindow();
+ replicationServerDomain.ack(ack, serverId);
+ } else if (msg instanceof UpdateMsg)
+ {
+ boolean filtered = false;
+ /* Ignore updates in some cases */
+ if (handler.isLDAPserver())
{
+ /**
+ * Ignore updates from DS in bad BAD_GENID_STATUS or
+ * FULL_UPDATE_STATUS
+ *
+ * The RSD lock should not be taken here as it is acceptable to
+ * have a delay between the time the server has a wrong status and
+ * the fact we detect it: the updates that succeed to pass during
+ * this time will have no impact on remote server. But it is
+ * interesting to not saturate uselessly the network if the
+ * updates are not necessary so this check to stop sending updates
+ * is interesting anyway. Not taking the RSD lock allows to have
+ * better performances in normal mode (most of the time).
+ */
+ ServerStatus dsStatus = handler.getStatus();
+ if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
+ (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+ {
+ long referenceGenerationId =
+ replicationServerDomain.getGenerationId();
+ if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+ logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
+ Short.toString(replicationServerDomain.
+ getReplicationServer().getServerId()),
+ replicationServerDomain.getBaseDn().toNormalizedString(),
+ ((UpdateMsg) msg).getChangeNumber().toString(),
+ Short.toString(handler.getServerId()),
+ Long.toString(referenceGenerationId),
+ Long.toString(handler.getGenerationId())));
+ if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+ logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
+ Short.toString(replicationServerDomain.
+ getReplicationServer().getServerId()),
+ replicationServerDomain.getBaseDn().toNormalizedString(),
+ ((UpdateMsg) msg).getChangeNumber().toString(),
+ Short.toString(handler.getServerId())));
+ filtered = true;
+ }
+ } else
+ {
+ /**
+ * Ignore updates from RS with bad gen id
+ * (no system managed status for a RS)
+ */
long referenceGenerationId =
replicationServerDomain.getGenerationId();
- if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
- logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
+ if ((referenceGenerationId > 0) &&
+ (referenceGenerationId != handler.getGenerationId()))
+ {
+ logError(ERR_IGNORING_UPDATE_FROM_RS.get(
Short.toString(replicationServerDomain.getReplicationServer().
getServerId()),
replicationServerDomain.getBaseDn().toNormalizedString(),
@@ -174,103 +204,86 @@
Short.toString(handler.getServerId()),
Long.toString(referenceGenerationId),
Long.toString(handler.getGenerationId())));
- if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
- logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
- Short.toString(replicationServerDomain.getReplicationServer().
- getServerId()),
- replicationServerDomain.getBaseDn().toNormalizedString(),
- ((UpdateMsg) msg).getChangeNumber().toString(),
- Short.toString(handler.getServerId())));
- filtered = true;
+ filtered = true;
+ }
}
- } else
- {
- /**
- * Ignore updates from RS with bad gen id
- * (no system managed status for a RS)
- */
- long referenceGenerationId =
- replicationServerDomain.getGenerationId();
- if ((referenceGenerationId > 0) &&
- (referenceGenerationId != handler.getGenerationId()))
- {
- logError(ERR_IGNORING_UPDATE_FROM_RS.get(
- Short.toString(replicationServerDomain.getReplicationServer().
- getServerId()),
- replicationServerDomain.getBaseDn().toNormalizedString(),
- ((UpdateMsg) msg).getChangeNumber().toString(),
- Short.toString(handler.getServerId()),
- Long.toString(referenceGenerationId),
- Long.toString(handler.getGenerationId())));
- filtered = true;
- }
- }
- if (!filtered)
+ if (!filtered)
+ {
+ UpdateMsg update = (UpdateMsg) msg;
+ handler.decAndCheckWindow();
+ replicationServerDomain.put(update, handler);
+ }
+ } else if (msg instanceof WindowMsg)
{
- UpdateMsg update = (UpdateMsg) msg;
- handler.decAndCheckWindow();
- replicationServerDomain.put(update, handler);
+ WindowMsg windowMsg = (WindowMsg) msg;
+ handler.updateWindow(windowMsg);
+ } else if (msg instanceof InitializeRequestMsg)
+ {
+ InitializeRequestMsg initializeMsg =
+ (InitializeRequestMsg) msg;
+ handler.process(initializeMsg);
+ } else if (msg instanceof InitializeTargetMsg)
+ {
+ InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
+ handler.process(initializeMsg);
+ } else if (msg instanceof EntryMsg)
+ {
+ EntryMsg entryMsg = (EntryMsg) msg;
+ handler.process(entryMsg);
+ } else if (msg instanceof DoneMsg)
+ {
+ DoneMsg doneMsg = (DoneMsg) msg;
+ handler.process(doneMsg);
+ } else if (msg instanceof ErrorMsg)
+ {
+ ErrorMsg errorMsg = (ErrorMsg) msg;
+ handler.process(errorMsg);
+ } else if (msg instanceof ResetGenerationIdMsg)
+ {
+ ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
+ replicationServerDomain.resetGenerationId(handler, genIdMsg);
+ } else if (msg instanceof WindowProbeMsg)
+ {
+ WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
+ handler.process(windowProbeMsg);
+ } else if (msg instanceof TopologyMsg)
+ {
+ TopologyMsg topoMsg = (TopologyMsg) msg;
+ replicationServerDomain.receiveTopoInfoFromRS(topoMsg,
+ handler, true);
+ } else if (msg instanceof ChangeStatusMsg)
+ {
+ ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
+ replicationServerDomain.processNewStatus(handler, csMsg);
+ } else if (msg instanceof MonitorRequestMsg)
+ {
+ MonitorRequestMsg replServerMonitorRequestMsg =
+ (MonitorRequestMsg) msg;
+ handler.process(replServerMonitorRequestMsg);
+ } else if (msg instanceof MonitorMsg)
+ {
+ MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
+ handler.process(replServerMonitorMsg);
+ } else if (msg == null)
+ {
+ /*
+ * The remote server has sent an unknown message,
+ * close the conenction.
+ */
+ Message message = NOTE_READER_NULL_MSG.get(handler.toString());
+ logError(message);
+ return;
}
- } else if (msg instanceof WindowMsg)
+ } catch (NotSupportedOldVersionPDUException e)
{
- WindowMsg windowMsg = (WindowMsg) msg;
- handler.updateWindow(windowMsg);
- } else if (msg instanceof InitializeRequestMsg)
- {
- InitializeRequestMsg initializeMsg =
- (InitializeRequestMsg) msg;
- handler.process(initializeMsg);
- } else if (msg instanceof InitializeTargetMsg)
- {
- InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
- handler.process(initializeMsg);
- } else if (msg instanceof EntryMsg)
- {
- EntryMsg entryMsg = (EntryMsg) msg;
- handler.process(entryMsg);
- } else if (msg instanceof DoneMsg)
- {
- DoneMsg doneMsg = (DoneMsg) msg;
- handler.process(doneMsg);
- } else if (msg instanceof ErrorMsg)
- {
- ErrorMsg errorMsg = (ErrorMsg) msg;
- handler.process(errorMsg);
- } else if (msg instanceof ResetGenerationIdMsg)
- {
- ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
- replicationServerDomain.resetGenerationId(handler, genIdMsg);
- } else if (msg instanceof WindowProbeMsg)
- {
- WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
- handler.process(windowProbeMsg);
- } else if (msg instanceof TopologyMsg)
- {
- TopologyMsg topoMsg = (TopologyMsg) msg;
- replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true);
- } else if (msg instanceof ChangeStatusMsg)
- {
- ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
- replicationServerDomain.processNewStatus(handler, csMsg);
- } else if (msg instanceof MonitorRequestMsg)
- {
- MonitorRequestMsg replServerMonitorRequestMsg =
- (MonitorRequestMsg) msg;
- handler.process(replServerMonitorRequestMsg);
- } else if (msg instanceof MonitorMsg)
- {
- MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
- handler.process(replServerMonitorMsg);
- } else if (msg == null)
- {
- /*
- * The remote server has sent an unknown message,
- * close the conenction.
- */
- Message message = NOTE_READER_NULL_MSG.get(handler.toString());
- logError(message);
- return;
+ // Received a V1 PDU we do not need to support:
+ // we just trash the message and log the event for debug purpose,
+ // then continue receiving messages.
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationServerDomain.
+ getReplicationServer().
+ getMonitorInstanceName() + ":" + e.getMessage());
}
}
} catch (IOException e)
@@ -304,13 +317,6 @@
*/
Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
logError(message);
- } catch (NotSupportedOldVersionPDUException e)
- {
- // Received a V1 PDU we do not need to support:
- // we just trash the message and log the event for debug purpose
- if (debugEnabled())
- TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ":" + e.getMessage());
} catch (Exception e)
{
if (debugEnabled())
diff --git a/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 60d06c8..7ad2f45 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -191,14 +191,7 @@
// Publish the update to the remote server using a protocol version he
// it supports
- short pduProtocolVersion = update.getVersion();
- if (protocolVersion < pduProtocolVersion)
- { // The remote server wants a lower protocol version than the PDU one,
- // send it the PDU, serializing it with the supported older version
- session.publish(update, protocolVersion);
- } else {
- session.publish(update);
- }
+ session.publish(update, protocolVersion);
}
}
catch (NoSuchElementException e)
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
index c8014f4..8382644 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -33,7 +33,6 @@
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -50,10 +49,8 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
-import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
-import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
@@ -353,56 +350,6 @@
}
}
- @Test(enabled=true)
- public void protocolVersion() throws Exception
- {
- logError(Message.raw(
- Category.SYNC, Severity.INFORMATION,
- "Starting Replication ProtocolWindowTest : protocolVersion"));
-
- ReplicationBroker broker = null;
-
- try
- {
- // Test : Make a broker degrade its version when connecting to an old
- // replication server.
- ProtocolVersion.resetCurrentVersion();
-
- broker = new ReplicationBroker(null,
- new ServerState(),
- baseDn,
- (short) 13, 0, 0, 0, 0, 1000, 0,
- ReplicationTestCase.getGenerationId(baseDn),
- getReplSessionSecurity(), (byte)1);
-
-
- // Check broker hard-coded version
- short pversion = broker.getProtocolVersion();
- assertEquals(pversion, ProtocolVersion.getCurrentVersion());
-
- // Connect the broker to the replication server
- ProtocolVersion.setCurrentVersion(ProtocolVersion.REPLICATION_PROTOCOL_V1);
- ArrayList<String> servers = new ArrayList<String>(1);
- servers.add("localhost:" + replServerPort);
- broker.start(servers);
- TestCaseUtils.sleep(3000); // wait for connection established
-
- // Check broker negociated version
- pversion = broker.getProtocolVersion();
- assertEquals(pversion, ProtocolVersion.REPLICATION_PROTOCOL_V1);
-
- logError(Message.raw(
- Category.SYNC, Severity.INFORMATION,
- "Ending Replication ProtocolWindowTest : protocolVersion"));
- } finally
- {
- if (broker != null)
- broker.stop();
-
- ProtocolVersion.resetCurrentVersion();
- }
- }
-
/**
* Clean up the environment.
*
--
Gitblit v1.10.0