From 76b8bb967a0e20ef38750dbffd893baa117c1f34 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 09 Jan 2009 10:06:40 +0000
Subject: [PATCH] - Assured replication (Safe Read) bug fixes - Some assured replication (Safe Read) unit tests
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java | 57 ++
opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java | 25 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 914 ++++++++++++++++++++++++++++++++++++++++++++----
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 79 ++-
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 28 +
5 files changed, 985 insertions(+), 118 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index df6b067..6d82905 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -329,7 +329,7 @@
/**
* The update message equivalent to the originally received update message,
* but with assured flag disabled. This message is the one that should be
- * sent to non elligible servers for assured mode.
+ * sent to non eligible servers for assured mode.
* We need a clone like of the original message with assured flag off, to be
* posted to servers we don't want to wait the ack from (not normal status
* servers or servers with different group id). This must be done because
@@ -479,7 +479,7 @@
* The list of servers identified as servers we are interested in
* receiving acks from. If this list is not null, then expectedAcksInfo
* should be not null.
- * Servers that are not in this list are servers not elligible for an ack
+ * Servers that are not in this list are servers not eligible for an ack
* request.
*
*/
@@ -496,7 +496,7 @@
* Process a just received assured update message in Safe Read mode. If the
* ack can be sent immediately, it is done here. This will also determine to
* which suitable servers an ack should be requested from, and which ones are
- * not elligible for an ack request.
+ * not eligible for an ack request.
* This method is an helper method for the put method. Have a look at the put
* method for a better understanding.
* @param update The just received assured update to process.
@@ -516,12 +516,12 @@
List<Short> expectedServers = new ArrayList<Short>();
List<Short> wrongStatusServers = new ArrayList<Short>();
- if (sourceGroupId != groupId)
+ if (sourceGroupId == groupId)
// Assured feature does not cross different group ids
{
if (sourceHandler.isLDAPserver())
{
- // Look for RS elligible for assured
+ // Look for RS eligible for assured
for (ServerHandler handler : replicationServers.values())
{
if (handler.getGroupId() == groupId)
@@ -537,7 +537,7 @@
}
}
- // Look for DS elligible for assured
+ // Look for DS eligible for assured
for (ServerHandler handler : directoryServers.values())
{
// Don't forward the change to the server that just sent it
@@ -560,10 +560,10 @@
wrongStatusServers.add(handler.getServerId());
} else
{
- /*
+ /**
* BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
* We do not want this to be reported as an error to the update
- * maker -> no pollution or potential missunderstanding when
+ * maker -> no pollution or potential misunderstanding when
* reading logs or monitoring and it was just administration (for
* instance new server is being configured in topo: it goes in bad
* gen then then full full update).
@@ -586,7 +586,7 @@
if (preparedAssuredInfo.expectedServers == null)
{
- // No elligible servers found, send the ack immediatly
+ // No eligible servers found, send the ack immediately
AckMsg ack = new AckMsg(cn);
sourceHandler.sendAck(ack);
}
@@ -598,7 +598,7 @@
* Process a just received assured update message in Safe Data mode. If the
* ack can be sent immediately, it is done here. This will also determine to
* which suitable servers an ack should be requested from, and which ones are
- * not elligible for an ack request.
+ * not eligible for an ack request.
* This method is an helper method for the put method. Have a look at the put
* method for a better understanding.
* @param update The just received assured update to process.
@@ -637,20 +637,24 @@
{
if (safeDataLevel == (byte) 1)
{
- // Immediatly return the ack for an assured message in safe data
- // mode with safe data level 1, coming from a DS. No need to wait
- // for more acks
+ /**
+ * Immediately return the ack for an assured message in safe data
+ * mode with safe data level 1, coming from a DS. No need to wait
+ * for more acks
+ */
AckMsg ack = new AckMsg(cn);
sourceHandler.sendAck(ack);
} else
{
if (safeDataLevel != (byte) 0)
{
- // level > 1 : We need further acks
- // The message will be posted in assured mode to elligible
- // servers. The embedded safe data level is not changed, and his
- // value will be used by a remote RS to determine if he must send
- // an ack (level > 1) or not (level = 1)
+ /**
+ * level > 1 : We need further acks
+ * The message will be posted in assured mode to eligible
+ * servers. The embedded safe data level is not changed, and his
+ * value will be used by a remote RS to determine if he must send
+ * an ack (level > 1) or not (level = 1)
+ */
interestedInAcks = true;
} else
{
@@ -661,12 +665,14 @@
{ // A RS sent us the safe data message, for sure no futher acks to wait
if (safeDataLevel == (byte) 1)
{
- // The original level was 1 so the RS that sent us this message
- // should have already sent his ack to the sender DS. Level 1 has
- // already been reached so no further acks to wait.
- // This should not happen in theory as the sender RS server should
- // have sent us a matching not assured message so we should not come
- // to here.
+ /**
+ * The original level was 1 so the RS that sent us this message
+ * should have already sent his ack to the sender DS. Level 1 has
+ * already been reached so no further acks to wait.
+ * This should not happen in theory as the sender RS server should
+ * have sent us a matching not assured message so we should not come
+ * to here.
+ */
} else
{
// level > 1, so Ack this message to originator RS
@@ -682,7 +688,7 @@
{
if (sourceHandler.isLDAPserver())
{
- // Look for RS elligible for assured
+ // Look for RS eligible for assured
for (ServerHandler handler : replicationServers.values())
{
if (handler.getGroupId() == groupId)
@@ -709,7 +715,7 @@
// Some other acks to wait for
int sdl = update.getSafeDataLevel();
int neededAdditionalServers = sdl - 1;
- // Change the number of expected acks if not enough available elligible
+ // Change the number of expected acks if not enough available eligible
// servers: the level is a best effort thing, we do not want to timeout
// at every assured SD update for instance if a RS has had his gen id
// resetted
@@ -721,8 +727,8 @@
preparedAssuredInfo.expectedServers = expectedServers;
} else
{
- // level > 1 and source is a DS but no elligible servers found, send the
- // ack immediatly
+ // level > 1 and source is a DS but no eligible servers found, send the
+ // ack immediately
AckMsg ack = new AckMsg(cn);
sourceHandler.sendAck(ack);
}
@@ -755,8 +761,11 @@
// remove object from the map
return;
}
- // If this is the last ack we were waiting from, immediatly create and
- // send the final ack to the original server
+ /**
+ *
+ * If this is the last ack we were waiting from, immediately create and
+ * send the final ack to the original server
+ */
if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
{
// Remove the object from the map as no more needed
@@ -768,7 +777,7 @@
origServer.sendAck(finalAck);
} catch (IOException e)
{
- /*
+ /**
* An error happened trying the send back an ack to the server.
* Log an error and close the connection to this server.
*/
@@ -794,7 +803,7 @@
/**
* The code run when the timeout occurs while waiting for acks of the
- * elligible servers. This basically sends a timeout ack (with any additional
+ * eligible servers. This basically sends a timeout ack (with any additional
* error info) to the original server that sent an assured update message.
*/
private class AssuredTimeoutTask extends TimerTask
@@ -846,7 +855,7 @@
origServer.sendAck(finalAck);
} catch (IOException e)
{
- /*
+ /**
* An error happened trying the send back an ack to the server.
* Log an error and close the connection to this server.
*/
@@ -2408,7 +2417,7 @@
/**
* Set the purge delay on all the db Handlers for this Domain
- * of Replicaiton.
+ * of Replication.
*
* @param delay The new purge delay to use.
*/
diff --git a/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java b/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
index e5cb702..f61b938 100644
--- a/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
+++ b/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
@@ -22,15 +22,17 @@
* CDDL HEADER END
*
*
- * Copyright 2008 Sun Microsystems, Inc.
+ * Copyright 2008-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
+import java.util.ArrayList;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import java.util.List;
+import java.util.Set;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.AckMsg;
@@ -60,7 +62,7 @@
// The list of server ids that had errors for the sent matching update
// Each server id of the list had one of the
// 3 possible errors (timeout, wrong status or replay error)
- private List<Short> failedServers = null;
+ private List<Short> failedServers = new ArrayList<Short>();
/**
* Number of servers we want an ack from and from which we received the ack.
@@ -210,12 +212,27 @@
ack.setHasTimeout(hasTimeout);
ack.setHasWrongStatus(hasWrongStatus);
ack.setHasReplayError(hasReplayError);
- ack.setFailedServers(failedServers);
- // Force anyway timeout flag if requested
if (timeout)
+ {
+ // Force anyway timeout flag if requested
ack.setHasTimeout(true);
+ // Add servers that did not respond in time
+ Set<Short> serverIds = expectedServersAckStatus.keySet();
+ for (Short serverId : serverIds)
+ {
+ boolean ackReceived = expectedServersAckStatus.get(serverId);
+ if (!ackReceived)
+ {
+ if (!failedServers.contains(serverId))
+ failedServers.add(serverId);
+ }
+ }
+ }
+
+ ack.setFailedServers(failedServers);
+
return ack;
}
}
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 1f54b7f..5c5fef1 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -331,6 +331,29 @@
}
/**
+ * Creates a ReplicationDomain with the provided parameters.
+ * (for unit test purpose only)
+ *
+ * @param serviceID The identifier of the Replication Domain to which
+ * this object is participating.
+ * @param serverID The identifier of the server that is participating
+ * to the Replication Domain.
+ * This identifier should be different for each server that
+ * is participating to a given Replication Domain.
+ * @param serverState The serverState to use
+ */
+ public ReplicationDomain(String serviceID, short serverID,
+ ServerState serverState)
+ {
+ this.serviceID = serviceID;
+ this.serverID = serverID;
+ this.state = serverState;
+ this.generator = new ChangeNumberGenerator(serverID, state);
+
+ domains.put(serviceID, this);
+ }
+
+ /**
* Set the initial status of the domain and perform necessary initializations.
* This method will be called by the Broker each time the ReplicationBroker
* establish a new session to a Replication Server.
@@ -773,8 +796,9 @@
}
numRcvdUpdates.incrementAndGet();
- if (update.isAssured() && (update.getAssuredMode() ==
- AssuredMode.SAFE_READ_MODE))
+ byte rsGroupId = broker.getRsGroupId();
+ if ( update.isAssured() && (update.getAssuredMode() ==
+ AssuredMode.SAFE_READ_MODE) && (rsGroupId == groupId) )
{
receivedAssuredSrUpdates.incrementAndGet();
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 87284b3..033dbcf 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008 Sun Microsystems, Inc.
+ * Copyright 2008-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
@@ -935,6 +935,9 @@
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
@@ -961,6 +964,9 @@
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1032,6 +1038,9 @@
Integer nError = errorsByServer.get((short)RS_SERVER_ID);
assertNotNull(nError);
assertEquals(nError.intValue(), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1054,6 +1063,9 @@
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1208,6 +1220,9 @@
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 1);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1269,6 +1284,9 @@
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1389,6 +1407,25 @@
assertFalse(ackMsg.hasReplayError());
assertFalse(ackMsg.hasWrongStatus());
assertEquals(ackMsg.getFailedServers().size(), 0);
+
+ // Check for monitoring data
+ DN baseDn = DN.decode(SAFE_READ_DN);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
+ Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
+ assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
+ errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
+ assertTrue(errorsByServer.isEmpty());
} catch (SocketTimeoutException e)
{
// Expected
@@ -1523,6 +1560,9 @@
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
@@ -1558,6 +1598,9 @@
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 2);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 2);
@@ -1595,6 +1638,9 @@
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 3);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 3);
@@ -1676,6 +1722,9 @@
nError = errorsByServer.get((short)20);
assertNotNull(nError);
assertEquals(nError.intValue(), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1718,6 +1767,9 @@
nError = errorsByServer.get((short)30);
assertNotNull(nError);
assertEquals(nError.intValue(), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1760,6 +1812,9 @@
nError = errorsByServer.get((short)RS_SERVER_ID);
assertNotNull(nError);
assertEquals(nError.intValue(), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index e049d81..e0d4512 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -34,6 +34,7 @@
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -97,15 +98,20 @@
private static final short FDS1_ID = 1;
private static final short FDS2_ID = 2;
private static final short FDS3_ID = 3;
+ private static final short FDS4_ID = 4;
+ private static final short FDS5_ID = 5;
private static final short FRS1_ID = 11;
private static final short FRS2_ID = 12;
private static final short FRS3_ID = 13;
+ private static final short DS_FRS2_ID = FRS2_ID + 10;
private static final short RS1_ID = 101;
private static final short RS2_ID = 102;
private static final short RS3_ID = 103;
private FakeReplicationDomain fakeRd1 = null;
private FakeReplicationDomain fakeRd2 = null;
private FakeReplicationDomain fakeRd3 = null;
+ private FakeReplicationDomain fakeRd4 = null;
+ private FakeReplicationDomain fakeRd5 = null;
private FakeReplicationServer fakeRs1 = null;
private FakeReplicationServer fakeRs2 = null;
private FakeReplicationServer fakeRs3 = null;
@@ -138,7 +144,7 @@
*/
// DS receives updates and replies acks with no errors to every updates
private static final int REPLY_OK_DS_SCENARIO = 1;
- // DS receives acks but does not respond (makes timeouts)
+ // DS receives updates but does not respond (makes timeouts)
private static final int TIMEOUT_DS_SCENARIO = 2;
// DS receives updates and replies ack with replay error flags
private static final int REPLAY_ERROR_DS_SCENARIO = 3;
@@ -148,10 +154,17 @@
*/
// RS receives updates and replies acks with no errors to every updates
private static final int REPLY_OK_RS_SCENARIO = 11;
- // RS receives acks but does not respond (makes timeouts)
+ // RS receives updates but does not respond (makes timeouts)
private static final int TIMEOUT_RS_SCENARIO = 12;
// RS is used for sending updates (with sendNewFakeUpdate()) and receive acks, synchronously
- private static final int SENDER_RS_SCENARIO = 13;
+ private static final int SENDER_RS_SCENARIO = 13;
+ // Scenarios only used in safe read tests:
+ // RS receives updates and replies ack error as if a DS was connected to it and timed out
+ private static final int DS_TIMEOUT_RS_SCENARIO_SAFE_READ = 14;
+ // RS receives updates and replies ack error as if a DS was connected to it and was wrong status
+ private static final int DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ = 15;
+ // RS receives updates and replies ack error as if a DS was connected to it and had a replay error
+ private static final int DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ = 16;
private void debugInfo(String s)
{
@@ -188,6 +201,8 @@
fakeRd1 = null;
fakeRd2 = null;
fakeRd3 = null;
+ fakeRd4 = null;
+ fakeRd5 = null;
fakeRs1 = null;
fakeRs2 = null;
fakeRs3 = null;
@@ -218,6 +233,18 @@
fakeRd3 = null;
}
+ if (fakeRd4 != null)
+ {
+ fakeRd4.disableService();
+ fakeRd4 = null;
+ }
+
+ if (fakeRd5 != null)
+ {
+ fakeRd5.disableService();
+ fakeRd5 = null;
+ }
+
// Shutdown fake RSs
if (fakeRs1 != null)
@@ -260,6 +287,18 @@
rs3 = null;
}
}
+ /**
+ * Creates and connects a new fake replication domain, using the passed scenario
+ * (no server state constructor version)
+ */
+ private FakeReplicationDomain createFakeReplicationDomain(short serverId,
+ int groupId, short rsId, long generationId, boolean assured,
+ AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
+ int scenario)
+ {
+ return createFakeReplicationDomain(serverId, groupId, rsId, generationId, assured,
+ assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState());
+ }
/**
* Creates and connects a new fake replication domain, using the passed scenario.
@@ -267,7 +306,7 @@
private FakeReplicationDomain createFakeReplicationDomain(short serverId,
int groupId, short rsId, long generationId, boolean assured,
AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
- int scenario)
+ int scenario, ServerState serverState)
{
try
{
@@ -290,7 +329,8 @@
FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain(
TEST_ROOT_DN_STRING, serverId, "localhost:" + rsPort, generationId,
- (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout, scenario);
+ (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout,
+ scenario, serverState);
// Test connection
assertTrue(fakeReplicationDomain.isConnected());
@@ -323,7 +363,7 @@
*/
private FakeReplicationServer createFakeReplicationServer(short serverId,
int groupId, short rsId, long generationId, boolean assured,
- AssuredMode assuredMode, int safeDataLevel, int scenario)
+ AssuredMode assuredMode, int safeDataLevel, ServerState serverState, int scenario)
{
try
{
@@ -349,7 +389,7 @@
TEST_ROOT_DN_STRING, generationId);
// Connect fake RS to the real RS
- assertTrue(fakeReplicationServer.connect());
+ assertTrue(fakeReplicationServer.connect(serverState));
// Start wished scenario
fakeReplicationServer.start(scenario);
@@ -375,7 +415,7 @@
if (serverId == RS1_ID)
{
port = rs1Port;
- if (testCase.equals("testSafeDataManyRealRSs"))
+ if (testCase.equals("testSafeDataManyRealRSs") || testCase.equals("testSafeReadOneManyRSsAndDSs"))
{
// Every 3 RSs connected together
replServers.add("localhost:" + rs2Port);
@@ -387,7 +427,7 @@
} else if (serverId == RS2_ID)
{
port = rs2Port;
- if (testCase.equals("testSafeDataManyRealRSs"))
+ if (testCase.equals("testSafeDataManyRealRSs") || testCase.equals("testSafeReadOneManyRSsAndDSs"))
{
// Every 3 RSs connected together
replServers.add("localhost:" + rs1Port);
@@ -399,7 +439,7 @@
} else if (serverId == RS3_ID)
{
port = rs3Port;
- if (testCase.equals("testSafeDataManyRealRSs"))
+ if (testCase.equals("testSafeDataManyRealRSs") || testCase.equals("testSafeReadOneManyRSsAndDSs"))
{
// Every 3 RSs connected together
replServers.add("localhost:" + rs1Port);
@@ -448,6 +488,8 @@
// Number of received updates
private int nReceivedUpdates = 0;
+ private boolean sameGidAsRs = true;
+
/**
* Creates a fake replication domain (DS)
* @param serviceID The base dn used at connection to RS
@@ -460,7 +502,7 @@
* @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates)
* @param assuredTimeout the assured timeout used when sending updates
* @param scenario the scenario we are creating for (implies particular
- * behaviour upon reception of updates)
+ * behavior upon reception of updates)
* @throws org.opends.server.config.ConfigException
*/
public FakeReplicationDomain(
@@ -473,9 +515,10 @@
AssuredMode assuredMode,
byte safeDataLevel,
long assuredTimeout,
- int scenario) throws ConfigException
+ int scenario,
+ ServerState serverState) throws ConfigException
{
- super(serviceID, serverID);
+ super(serviceID, serverID, serverState);
List<String> replicationServers = new ArrayList<String>();
replicationServers.add(replicationServer);
this.generationId = generationId;
@@ -544,42 +587,40 @@
@Override
public boolean processUpdate(UpdateMsg updateMsg)
{
- try
- {
- checkUpdateAssuredParameters(updateMsg);
- nReceivedUpdates++;
- // Now execute the requested scenario
- AckMsg ackMsg = null;
- switch (scenario)
- {
- case REPLY_OK_DS_SCENARIO:
- // Send the ack without errors
- ackMsg = new AckMsg(updateMsg.getChangeNumber());
- session.publish(ackMsg);
- break;
- case TIMEOUT_DS_SCENARIO:
- // Let timeout occur
- break;
- case REPLAY_ERROR_DS_SCENARIO:
- // Send the ack with replay error
- ackMsg = new AckMsg(updateMsg.getChangeNumber());
- ackMsg.setHasReplayError(true);
- List<Short> failedServers = new ArrayList<Short>();
- failedServers.add(getServerId());
- ackMsg.setFailedServers(failedServers);
- session.publish(ackMsg);
- break;
- default:
- fail("Unknown scenario: " + scenario);
- }
- return true;
- } catch (IOException ex)
+ checkUpdateAssuredParameters(updateMsg);
+ nReceivedUpdates++;
+
+ // Now execute the requested scenario
+ switch (scenario)
{
- fail("IOException in fake replication domain " + getServerId() + " :" +
- ex.getMessage());
- return false;
+ case REPLY_OK_DS_SCENARIO:
+ // Send the ack without errors
+ // Call processUpdateDone and update the server state is what needs to
+ // be done when using asynchronous process update mechanism
+ // (see processUpdate javadoc)
+ processUpdateDone(updateMsg, null);
+ getServerState().update(updateMsg.getChangeNumber());
+ break;
+ case TIMEOUT_DS_SCENARIO:
+ // Let timeout occur
+ break;
+ case REPLAY_ERROR_DS_SCENARIO:
+ // Send the ack with replay error
+ // Call processUpdateDone and update the server state is what needs to
+ // be done when using asynchronous process update mechanism
+ // (see processUpdate javadoc)
+ processUpdateDone(updateMsg, "This is the replay error message generated from fake DS " +
+ getServerId() + " for update with change number " + updateMsg.
+ getChangeNumber());
+ getServerState().update(updateMsg.getChangeNumber());
+ break;
+ default:
+ fail("Unknown scenario: " + scenario);
}
+ // IMPORTANT: return false so that we use the asynchronous processUpdate mechanism
+ // (see processUpdate javadoc)
+ return false;
}
/**
@@ -663,7 +704,7 @@
// Number of received updates
private int nReceivedUpdates = 0;
- // True is an ack has been replied to a received assured update (in assured mode of course)
+ // True if an ack has been replied to a received assured update (in assured mode of course)
// used in reply scenario
private boolean ackReplied = false;
@@ -741,7 +782,7 @@
* Connect to RS
* Returns true if connection was made successfully
*/
- public boolean connect()
+ public boolean connect(ServerState serverState)
{
try
{
@@ -761,7 +802,7 @@
// Send our repl server start msg
ReplServerStartMsg replServerStartMsg = new ReplServerStartMsg(serverId,
- fakeUrl, baseDn, 100, new ServerState(),
+ fakeUrl, baseDn, 100, serverState,
ProtocolVersion.getCurrentVersion(), generationId, sslEncryption,
groupId, 5000);
session.publish(replServerStartMsg);
@@ -857,6 +898,47 @@
case TIMEOUT_RS_SCENARIO:
// Let timeout occur
break;
+ case DS_TIMEOUT_RS_SCENARIO_SAFE_READ:
+ if (updateMsg.isAssured())
+ {
+ // Emulate RS waiting for virtual DS ack
+ sleep(MAX_SEND_UPDATE_TIME);
+ // Send the ack with timeout error from a virtual DS with id (ours + 10)
+ AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
+ ackMsg.setHasTimeout(true);
+ List<Short> failedServers = new ArrayList<Short>();
+ failedServers.add((short)(serverId + 10));
+ ackMsg.setFailedServers(failedServers);
+ session.publish(ackMsg);
+ ackReplied = true;
+ }
+ break;
+ case DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ:
+ if (updateMsg.isAssured())
+ {
+ // Send the ack with wrong status error from a virtual DS with id (ours + 10)
+ AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
+ ackMsg.setHasWrongStatus(true);
+ List<Short> failedServers = new ArrayList<Short>();
+ failedServers.add((short)(serverId + 10));
+ ackMsg.setFailedServers(failedServers);
+ session.publish(ackMsg);
+ ackReplied = true;
+ }
+ break;
+ case DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ:
+ if (updateMsg.isAssured())
+ {
+ // Send the ack with replay error from a virtual DS with id (ours + 10)
+ AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
+ ackMsg.setHasReplayError(true);
+ List<Short> failedServers = new ArrayList<Short>();
+ failedServers.add((short)(serverId + 10));
+ ackMsg.setFailedServers(failedServers);
+ session.publish(ackMsg);
+ ackReplied = true;
+ }
+ break;
default:
fail("Unknown scenario: " + scenario);
}
@@ -910,7 +992,7 @@
}
/**
- * Check that received update assured parameters are as defined at DS start
+ * Check that received update assured parameters are as defined at RS start
*/
private void checkUpdateAssuredParameters(UpdateMsg updateMsg)
{
@@ -948,7 +1030,7 @@
}
/**
- * Test if the last received updates was acknowledged
+ * Test if the last received updates was acknowledged (ack sent with or without errors)
* WARNING: this must be called once per update as it also immediatly resets the status
* for a new test for the next update
* @return True if acknowledged
@@ -1100,7 +1182,7 @@
// this would timeout. If main DS group id is not the same as the real RS one,
// the update will even not come to real RS as asured
fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRsGid, RS1_ID,
- DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, TIMEOUT_RS_SCENARIO);
+ DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, new ServerState(), TIMEOUT_RS_SCENARIO);
assertNotNull(fakeRs1);
}
@@ -1120,7 +1202,7 @@
long sendUpdateTime = System.currentTimeMillis() - startTime;
assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
if (mainDsGid == DEFAULT_GID)
{
// Check monitoring values (check that ack has been correctly received)
@@ -1139,7 +1221,7 @@
}
// Sanity check
- sleep(1000); // Let time to update to reach other servers
+ sleep(500); // Let time to update to reach other servers
assertEquals(fakeRd1.nReceivedUpdates(), 0);
assertTrue(fakeRd1.receivedUpdatesOk());
if (otherFakeDS)
@@ -1435,24 +1517,24 @@
// Put a fake RS 1 connected to real RS
fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRs1Gid, RS1_ID,
fakeRs1GenId, ((fakeRs1Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
- fakeRs1Scen);
+ new ServerState(), fakeRs1Scen);
assertNotNull(fakeRs1);
// Put a fake RS 2 connected to real RS
fakeRs2 = createFakeReplicationServer(FRS2_ID, fakeRs2Gid, RS1_ID,
fakeRs2GenId, ((fakeRs2Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
- fakeRs2Scen);
+ new ServerState(), fakeRs2Scen);
assertNotNull(fakeRs2);
// Put a fake RS 3 connected to real RS
fakeRs3 = createFakeReplicationServer(FRS3_ID, fakeRs3Gid, RS1_ID,
fakeRs3GenId, ((fakeRs3Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
- fakeRs3Scen);
+ new ServerState(), fakeRs3Scen);
assertNotNull(fakeRs3);
// Wait for connections to be finished
// DS must see expected numbers of fake DSs and RSs
- waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
+ waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 4);
/***********************************************************************
* Send update from DS 1 (3 fake RSs available) and check what happened
@@ -1479,9 +1561,9 @@
long sendUpdateTime = System.currentTimeMillis() - startTime;
// Check
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
checkTimeAndMonitoringSafeData(1, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
- checkWhatHasBeenReceived(1, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, fakeRs3GenId, expectedServers);
+ checkWhatHasBeenReceivedSafeData(1, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, fakeRs3GenId, expectedServers);
/***********************************************************************
* Send update from DS 1 (2 fake RSs available) and check what happened
@@ -1493,7 +1575,7 @@
// Wait for disconnection to be finished
// DS must see expected numbers of fake DSs and RSs
- waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 2);
+ waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
// Keep track of monitoring values for incremental test step
acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
@@ -1516,9 +1598,9 @@
sendUpdateTime = System.currentTimeMillis() - startTime;
// Check
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
checkTimeAndMonitoringSafeData(2, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
- checkWhatHasBeenReceived(2, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, -1L, expectedServers);
+ checkWhatHasBeenReceivedSafeData(2, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, -1L, expectedServers);
/***********************************************************************
* Send update from DS 1 (1 fake RS available) and check what happened
@@ -1530,7 +1612,7 @@
// Wait for disconnection to be finished
// DS must see expected numbers of fake DSs and RSs
- waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 1);
+ waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 2);
// Keep track of monitoring values for incremental test step
acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
@@ -1553,9 +1635,9 @@
sendUpdateTime = System.currentTimeMillis() - startTime;
// Check
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
checkTimeAndMonitoringSafeData(3, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
- checkWhatHasBeenReceived(3, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, -1L, -1L, expectedServers);
+ checkWhatHasBeenReceivedSafeData(3, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, -1L, -1L, expectedServers);
/***********************************************************************
* Send update from DS 1 (no fake RS available) and check what happened
@@ -1567,7 +1649,7 @@
// Wait for disconnection to be finished
// DS must see expected numbers of fake DSs and RSs
- waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 0);
+ waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 1);
// Keep track of monitoring values for incremental test step
acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
@@ -1590,9 +1672,9 @@
sendUpdateTime = System.currentTimeMillis() - startTime;
// Check
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
checkTimeAndMonitoringSafeData(4, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
- checkWhatHasBeenReceived(4, otherFakeDS, otherFakeDsGenId, -1L, -1L, -1L, expectedServers);
+ checkWhatHasBeenReceivedSafeData(4, otherFakeDS, otherFakeDsGenId, -1L, -1L, -1L, expectedServers);
} finally
{
endTest();
@@ -1602,7 +1684,7 @@
// Check that the DSs and the fake RSs of the topology have received/acked what is expected according to the
// test step (the number of updates)
// -1 for a gen id means no need to test the matching fake RS
- private void checkWhatHasBeenReceived(int nSentUpdates, boolean otherFakeDS, long otherFakeDsGenId, long fakeRs1GenId, long fakeRs2GenId, long fakeRs3GenId, List<Short> expectedServers)
+ private void checkWhatHasBeenReceivedSafeData(int nSentUpdates, boolean otherFakeDS, long otherFakeDsGenId, long fakeRs1GenId, long fakeRs2GenId, long fakeRs3GenId, List<Short> expectedServers)
{
// We should not receive our own update
@@ -1805,7 +1887,7 @@
rsInfo = fakeRd.getRsList();
nDs = dsInfo.size();
nRs = rsInfo.size();
- if ( (nDs == expectedDs) && (nRs == (expectedRs+1)) ) // Must include real RS so '+1'
+ if ( (nDs == expectedDs) && (nRs == expectedRs) ) // Must include real RS so '+1'
{
debugInfo("waitForStableTopo: expected topo obtained after " + (30-nSec) + " second(s).");
return;
@@ -1817,7 +1899,7 @@
" DSs (had " + dsInfo +") and " + expectedRs + " RSs (had " + rsInfo +").");
}
- // Compute the list of servers that are elligible for receiving an assured update
+ // Compute the list of servers that are elligible for receiving a safe data assured update
// according to their group id and generation id. If -1 is used, the server is out of scope
private List<Short> computeElligibleServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs2Gid, long fakeRs2GenId, int fakeRs3Gid, long fakeRs3GenId)
{
@@ -1847,7 +1929,7 @@
return false;
}
- // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+ // Compute the list of servers that are elligible for receiving a safe data assured update and that are expected to effectively ack the update
// If -1 is used, the server is out of scope
private List<Short> computeExpectedServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen, int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
{
@@ -1946,7 +2028,7 @@
// Put a fake RS 2 connected to real RS
fakeRs2 = createFakeReplicationServer(FRS2_ID, DEFAULT_GID, RS1_ID,
DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 10,
- TIMEOUT_RS_SCENARIO);
+ new ServerState(), TIMEOUT_RS_SCENARIO);
assertNotNull(fakeRs2);
/*
@@ -1956,7 +2038,7 @@
// Put a fake RS 1 connected to real RS
fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRsGid, RS1_ID,
fakeRsGenId, sendInAssured, AssuredMode.SAFE_DATA_MODE, sdLevel,
- SENDER_RS_SCENARIO);
+ new ServerState(), SENDER_RS_SCENARIO);
assertNotNull(fakeRs1);
/*
@@ -2068,7 +2150,7 @@
// Wait for RSs connections to be finished
// DS must see expected numbers of RSs
- waitForStableTopo(fakeRd1, 0, 2);
+ waitForStableTopo(fakeRd1, 0, 3);
/*
* Send update from DS 1 and check result
@@ -2088,6 +2170,7 @@
assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
// Check monitoring values (check that ack has been correctly received)
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
assertEquals(fakeRd1.getAssuredSdSentUpdates(), 1);
assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), 1);
assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), 0);
@@ -2097,5 +2180,684 @@
endTest();
}
}
+
+ /**
+ * Test safe read mode with only one real RS deployment. One fake DS sends
+ * assured messages to one other fake DS connected to the RS a fake RS
+ * connected to the real RS is also expected to send the ack
+ */
+ @Test(enabled = true)
+ public void testSafeReadOneRSBasic() throws Exception
+ {
+ String testCase = "testSafeReadOneRSBasic";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ try
+ {
+ /*******************
+ * Start real RS (the one to be tested)
+ */
+
+ // Create real RS 1
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+ testCase);
+ assertNotNull(rs1);
+
+ /*******************
+ * Start main DS 1 (the one which sends updates)
+ */
+
+ // Create and connect DS 1 to RS 1
+ // Assured mode: SR
+ fakeRd1 = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
+ DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+ TIMEOUT_DS_SCENARIO);
+ assertNotNull(fakeRd1);
+
+ /*
+ * Send a first assured safe read update
+ */
+
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ long sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Check call time (should be short as RS should have acked)
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+ // Check monitoring values (check that ack has been correctly received)
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
+ assertEquals(fakeRd1.getAssuredSrSentUpdates(), 1);
+ assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 1);
+ assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+ // Sanity check
+ sleep(500); // Let time to update to reach servers
+ assertEquals(fakeRd1.nReceivedUpdates(), 0);
+ assertTrue(fakeRd1.receivedUpdatesOk());
+
+ /*******************
+ * Start another fake DS 2 connected to RS
+ */
+
+ // Create and connect DS 2 to RS 1
+ // Assured mode: SR
+ ServerState serverState = fakeRd1.getServerState();
+ fakeRd2 = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
+ DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+ REPLY_OK_DS_SCENARIO, serverState);
+ assertNotNull(fakeRd2);
+
+ // Wait for connections to be established
+ waitForStableTopo(fakeRd1, 1, 1);
+
+ /*
+ * Send a second assured safe read update
+ */
+
+ startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Check call time (should be short as RS should have acked)
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+ // Check monitoring values (check that ack has been correctly received)
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
+ assertEquals(fakeRd1.getAssuredSrSentUpdates(), 2);
+ assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 2);
+ assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+ assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+ assertEquals(fakeRd2.getReceivedAssuredSrUpdates(), 1);
+ assertEquals(fakeRd2.getReceivedAssuredSrUpdatesAcked(), 1);
+ assertEquals(fakeRd2.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+ // Sanity check
+ sleep(500); // Let time to update to reach servers
+ assertEquals(fakeRd1.nReceivedUpdates(), 0);
+ assertTrue(fakeRd1.receivedUpdatesOk());
+
+ assertEquals(fakeRd2.nReceivedUpdates(), 1);
+ assertTrue(fakeRd2.receivedUpdatesOk());
+
+ /*******************
+ * Start a fake RS 1 connected to RS
+ */
+
+ fakeRs1 = createFakeReplicationServer(FRS1_ID, DEFAULT_GID, RS1_ID,
+ DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1,
+ fakeRd1.getServerState(), REPLY_OK_RS_SCENARIO);
+ assertNotNull(fakeRs1);
+
+ // Wait for connections to be established
+ waitForStableTopo(fakeRd1, 1, 2);
+
+ /*
+ * Send a third assured safe read update
+ */
+
+ startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Check call time (should be short as RS should have acked)
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+ // Check monitoring values (check that ack has been correctly received)
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
+ assertEquals(fakeRd1.getAssuredSrSentUpdates(), 3);
+ assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 3);
+ assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+ assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+ assertEquals(fakeRd2.getReceivedAssuredSrUpdates(), 2);
+ assertEquals(fakeRd2.getReceivedAssuredSrUpdatesAcked(), 2);
+ assertEquals(fakeRd2.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+ // Sanity check
+ sleep(500); // Let time to update to reach servers
+ assertEquals(fakeRd1.nReceivedUpdates(), 0);
+ assertTrue(fakeRd1.receivedUpdatesOk());
+
+ assertEquals(fakeRd2.nReceivedUpdates(), 2);
+ assertTrue(fakeRd2.receivedUpdatesOk());
+
+ assertEquals(fakeRs1.nReceivedUpdates(), 1);
+ assertTrue(fakeRs1.receivedUpdatesOk());
+
+ /*******************
+ * Shutdown fake DS 2
+ */
+
+ // Shutdown fake DS 2
+ fakeRd2.disableService();
+ fakeRd2 = null;
+
+ // Wait for disconnection to be finished
+ waitForStableTopo(fakeRd1, 0, 2);
+
+ /*
+ * Send a fourth assured safe read update
+ */
+
+ startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Check call time (should be short as RS should have acked)
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+ // Check monitoring values (check that ack has been correctly received)
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
+ assertEquals(fakeRd1.getAssuredSrSentUpdates(), 4);
+ assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 4);
+ assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+ // Sanity check
+ sleep(500); // Let time to update to reach servers
+ assertEquals(fakeRd1.nReceivedUpdates(), 0);
+ assertTrue(fakeRd1.receivedUpdatesOk());
+
+ assertEquals(fakeRs1.nReceivedUpdates(), 2);
+ assertTrue(fakeRs1.receivedUpdatesOk());
+
+ /*******************
+ * Shutdown fake RS 1
+ */
+
+ // Shutdown fake RS 1
+ fakeRs1.shutdown();
+ fakeRs1 = null;
+
+ // Wait for disconnection to be finished
+ waitForStableTopo(fakeRd1, 0, 1);
+
+ /*
+ * Send a fifth assured safe read update
+ */
+
+ startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Check call time (should be short as RS should have acked)
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+ // Check monitoring values (check that ack has been correctly received)
+ sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
+ assertEquals(fakeRd1.getAssuredSrSentUpdates(), 5);
+ assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 5);
+ assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+ // Sanity check
+ sleep(500); // Let time to update to reach servers
+ assertEquals(fakeRd1.nReceivedUpdates(), 0);
+ assertTrue(fakeRd1.receivedUpdatesOk());
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ /**
+ * Returns possible combinations of parameters for testSafeReadOneRSComplexPrecommit test
+ */
+ @DataProvider(name = "testSafeReadOneRSComplexPrecommitProvider")
+ private Object[][] testSafeReadOneRSComplexPrecommitProvider()
+ {
+ return new Object[][]
+ {
+ {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ {DEFAULT_GID, DEFAULT_GENID, TIMEOUT_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ {DEFAULT_GID, DEFAULT_GENID, REPLAY_ERROR_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO},
+ {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, DS_TIMEOUT_RS_SCENARIO_SAFE_READ},
+ {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ},
+ {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ},
+ {OTHER_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO}
+ };
+ }
+
+ /**
+ * See testSafeReadOneRSComplex comment.
+ */
+ @Test(dataProvider = "testSafeReadOneRSComplexPrecommitProvider", enabled = true)
+ public void testSafeReadOneRSComplexPrecommit(int otherFakeDsGid, long otherFakeDsGenId, int otherFakeDsScen,
+ int otherFakeRsGid, long otherFakeRsGenId, int otherFakeRsScen) throws Exception
+ {
+ testSafeReadOneRSComplex(otherFakeDsGid, otherFakeDsGenId, otherFakeDsScen,
+ otherFakeRsGid, otherFakeRsGenId, otherFakeRsScen);
+ }
+
+ /**
+ * Returns possible combinations of parameters for testSafeReadOneRSComplex test
+ */
+ @DataProvider(name = "testSafeReadOneRSComplexProvider")
+ private Object[][] testSafeReadOneRSComplexProvider()
+ {
+ List<List<Object>> objectArrayList = new ArrayList<List<Object>>();
+
+ // Other additional DS group id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+ // Other additional DS generation id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+ // Other additional DS scenario
+ objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_DS_SCENARIO, TIMEOUT_DS_SCENARIO, REPLAY_ERROR_DS_SCENARIO);
+ // Other additional RS group id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+ // Other additional RS generation id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+ // Other additional RS scenario
+ objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO, DS_TIMEOUT_RS_SCENARIO_SAFE_READ, DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ, DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ);
+
+ Object[][] result = new Object[objectArrayList.size()][];
+ int i = 0;
+ for (List<Object> objectArray : objectArrayList)
+ {
+ result[i] = objectArray.toArray();
+ i++;
+ }
+ return result;
+ }
+
+ /**
+ * Test safe read mode with only one real RS deployment.
+ * Test that the RS is able to acknowledge SR updates with level higher than 1
+ * and also to return errors is some errors occur.
+ * - 1 main fake DS connected to the RS
+ * - 1 other fake DS connected to the RS, with same GID as RS and same GENID as RS and always acking without error
+ * - 1 other fake DS connected to the RS, with GID, GENID, scenario...changed through the provider
+ * - 1 fake RS connected to the RS (emulating one fake DS connected to it), with same GID as RS and always acking without error
+ * - 1 other fake RS connected to the RS (emulating one fake DS connected to it), with GID scenario...changed through the provider
+ *
+ * All possible combinations tested thanks to the provider.
+ */
+ @Test(dataProvider = "testSafeReadOneRSComplexProvider", groups = "slow", enabled = false) // Working but disabled as 17.5 minutes to run
+ public void testSafeReadOneRSComplex(int otherFakeDsGid, long otherFakeDsGenId, int otherFakeDsScen,
+ int otherFakeRsGid, long otherFakeRsGenId, int otherFakeRsScen) throws Exception
+ {
+ String testCase = "testSafeReadOneRSComplex";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ try
+ {
+ /*
+ * Start real RS (the one to be tested)
+ */
+
+ // Create real RS 1
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+ testCase);
+ assertNotNull(rs1);
+
+ /*
+ * Start main DS 1 (the one which sends updates)
+ */
+
+ fakeRd1 = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
+ DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+ TIMEOUT_DS_SCENARIO);
+ assertNotNull(fakeRd1);
+
+ /*
+ * Start another fake DS 2 connected to RS
+ */
+
+ fakeRd2 = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
+ DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+ REPLY_OK_DS_SCENARIO);
+ assertNotNull(fakeRd2);
+
+ /*
+ * Start another fake DS 3 connected to RS
+ */
+
+ fakeRd3 = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID,
+ otherFakeDsGenId, ((otherFakeDsGid == DEFAULT_GID) ? true : false),
+ AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+ otherFakeDsScen);
+ assertNotNull(fakeRd3);
+
+ /*
+ * Start fake RS (RS 1) connected to RS
+ */
+
+ fakeRs1 = createFakeReplicationServer(FRS1_ID, DEFAULT_GID, RS1_ID,
+ DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1,
+ new ServerState(), REPLY_OK_RS_SCENARIO);
+ assertNotNull(fakeRs1);
+
+ /*
+ * Start another fake RS (RS 2) connected to RS
+ */
+
+ fakeRs2 = createFakeReplicationServer(FRS2_ID, otherFakeRsGid, RS1_ID,
+ otherFakeRsGenId, ((otherFakeRsGid == DEFAULT_GID) ? true : false),
+ AssuredMode.SAFE_READ_MODE, 1, new ServerState(), otherFakeRsScen);
+ assertNotNull(fakeRs2);
+
+ // Wait for connections to be established
+ waitForStableTopo(fakeRd1, 2, 3);
+
+ /*
+ * Send an assured safe read update
+ */
+
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ long sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Compute some thing that will help determine what to check according to
+ // the current test configurarion: compute if DS and RS subject to conf
+ // change are elligible and expected for safe read assured
+ // elligible: the server should receive the ack request
+ // expected: the server should send back an ack (with or without error)
+ boolean dsIsEligible = areGroupAndGenerationIdOk(otherFakeDsGid, otherFakeDsGenId);
+ boolean rsIsEligible = areGroupAndGenerationIdOk(otherFakeRsGid, otherFakeRsGenId);
+ boolean dsIsExpected = false;
+ boolean rsIsExpected = false;
+ // Booleans to tell if we expect to see the timeout, wrong status and replay error flags
+ boolean shouldSeeTimeout = false;
+ boolean shouldSeeWrongStatus = false;
+ boolean shouldSeeReplayError = false;
+ // Booleans to tell if we expect to see the ds, rs and virtual ds connected to fake rs in server id error list
+ boolean shouldSeeDsIdInError = false;
+ boolean shouldSeeRsIdInError = false;
+ boolean shouldSeeDsRsIdInError = false;
+ if (dsIsEligible)
+ {
+ switch (otherFakeDsScen)
+ {
+ case REPLY_OK_DS_SCENARIO:
+ dsIsExpected = true;
+ break;
+ case TIMEOUT_DS_SCENARIO:
+ shouldSeeDsIdInError = true;
+ shouldSeeTimeout = true;
+ break;
+ case REPLAY_ERROR_DS_SCENARIO:
+ shouldSeeDsIdInError = true;
+ shouldSeeReplayError = true;
+ break;
+ default:
+ fail("No other scenario should be used here");
+ }
+ }
+ if (rsIsEligible)
+ {
+ switch (otherFakeRsScen)
+ {
+ case REPLY_OK_RS_SCENARIO:
+ rsIsExpected = true;
+ break;
+ case TIMEOUT_RS_SCENARIO:
+ shouldSeeRsIdInError = true;
+ shouldSeeTimeout = true;
+ break;
+ case DS_TIMEOUT_RS_SCENARIO_SAFE_READ:
+ shouldSeeDsRsIdInError = true;
+ shouldSeeTimeout = true;
+ break;
+ case DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ:
+ shouldSeeDsRsIdInError = true;
+ shouldSeeReplayError = true;
+ break;
+ case DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ:
+ shouldSeeDsRsIdInError = true;
+ shouldSeeWrongStatus = true;
+ break;
+ default:
+ fail("No other scenario should be used here");
+ }
+ }
+
+ if (!shouldSeeTimeout)
+ {
+ // Call time should have been short
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+ } else // Timeout
+ {
+ if (shouldSeeDsRsIdInError) // Virtual DS timeout
+ {
+ // Should have timed out
+ assertTrue((MAX_SEND_UPDATE_TIME <= sendUpdateTime) && (sendUpdateTime <=
+ LONG_TIMEOUT));
+ } else // Normal rimeout case
+ {
+ // Should have timed out
+ assertTrue((SMALL_TIMEOUT <= sendUpdateTime) && (sendUpdateTime <=
+ LONG_TIMEOUT));
+ }
+ }
+
+ // Sleep a while as counters are updated just after sending thread is unblocked
+ sleep(500);
+
+ // Check monitoring values in DS 1
+ //
+ assertEquals(fakeRd1.getAssuredSrSentUpdates(), 1);
+ if (( (otherFakeDsGid == DEFAULT_GID) && (otherFakeDsGenId == DEFAULT_GENID) && (otherFakeDsScen != REPLY_OK_DS_SCENARIO) )
+ || ( (otherFakeRsGid == DEFAULT_GID) && (otherFakeRsGenId == DEFAULT_GENID) && (otherFakeRsScen != REPLY_OK_RS_SCENARIO) ))
+ {
+ assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 1);
+ }
+ else
+ {
+ assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 1);
+ assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+ }
+
+
+ if (shouldSeeTimeout)
+ assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 1);
+ else
+ assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+ if (shouldSeeWrongStatus)
+ assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 1);
+ else
+ assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+ if (shouldSeeReplayError)
+ assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 1);
+ else
+ assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+
+ // Check for servers in error list
+ Map<Short, Integer> expectedErrors = new HashMap<Short, Integer>();
+ if (shouldSeeDsIdInError)
+ expectedErrors.put(FDS3_ID, 1);
+ if (shouldSeeRsIdInError)
+ expectedErrors.put(FRS2_ID, 1);
+ if (shouldSeeDsRsIdInError)
+ expectedErrors.put(DS_FRS2_ID, 1);
+ checkServerErrorListsAreEqual(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates(), expectedErrors);
+
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+ assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+ // Check monitoring values in DS 2
+ //
+ assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
+ assertEquals(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+ assertEquals(fakeRd2.getReceivedAssuredSrUpdates(), 1);
+ assertEquals(fakeRd2.getReceivedAssuredSrUpdatesAcked(), 1);
+ assertEquals(fakeRd2.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+ // Check monitoring values in DS 3
+ //
+ assertEquals(fakeRd3.getAssuredSrSentUpdates(), 0);
+ assertEquals(fakeRd3.getAssuredSrAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd3.getAssuredSrNotAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd3.getAssuredSrTimeoutUpdates(), 0);
+ assertEquals(fakeRd3.getAssuredSrWrongStatusUpdates(), 0);
+ assertEquals(fakeRd3.getAssuredSrReplayErrorUpdates(), 0);
+ assertEquals(fakeRd3.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+ if (dsIsEligible)
+ {
+ assertEquals(fakeRd3.getReceivedAssuredSrUpdates(), 1);
+ if (dsIsExpected)
+ {
+ assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 1);
+ assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 0);
+ } else
+ {
+ if (shouldSeeReplayError && (otherFakeDsScen == REPLAY_ERROR_DS_SCENARIO))
+ {
+ // Replay error for the other DS
+ assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 0);
+ assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 1);
+ } else
+ {
+ assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 0);
+ assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 0);
+ }
+ }
+ }
+ else
+ {
+ assertEquals(fakeRd3.getReceivedAssuredSrUpdates(), 0);
+ assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 0);
+ assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 0);
+ }
+
+ // Sanity check
+ //
+ assertEquals(fakeRd1.nReceivedUpdates(), 0);
+ assertTrue(fakeRd1.receivedUpdatesOk());
+
+ assertEquals(fakeRd2.nReceivedUpdates(), 1);
+ assertTrue(fakeRd2.receivedUpdatesOk());
+
+ if (otherFakeDsGenId == DEFAULT_GENID)
+ assertEquals(fakeRd3.nReceivedUpdates(), 1);
+ else
+ assertEquals(fakeRd3.nReceivedUpdates(), 0);
+ assertTrue(fakeRd3.receivedUpdatesOk());
+
+ assertEquals(fakeRs1.nReceivedUpdates(), 1);
+ assertTrue(fakeRs1.receivedUpdatesOk());
+
+ if (otherFakeRsGenId == DEFAULT_GENID)
+ assertEquals(fakeRs2.nReceivedUpdates(), 1);
+ else
+ assertEquals(fakeRs2.nReceivedUpdates(), 0);
+ assertTrue(fakeRs2.receivedUpdatesOk());
+
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ /**
+ * Check that the passed server error lists are equivalent
+ */
+ private void checkServerErrorListsAreEqual(Map<Short, Integer> list1, Map<Short, Integer> list2)
+ {
+ assertNotNull(list1);
+ assertNotNull(list2);
+ assertEquals(list1.size(), list2.size());
+ for (Short s : list1.keySet())
+ {
+ assertEquals(list1.get(s), list2.get(s));
+ }
+ }
}
--
Gitblit v1.10.0