From ee5658e776839088da75a481df7a99f224aa8d14 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Tue, 16 Dec 2008 17:03:47 +0000
Subject: [PATCH] Assured Replication: - all unit tests for safe data mode - assured replication code corrections (thanks to safe data unit tests) => Still every unit tests for safe read mode to do...
---
opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java | 23
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java | 469 ++++++++-
opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java | 47
opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java | 23
opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml | 2
opends/src/messages/messages/replication.properties | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 2099 +++++++++++++++++++++++++++++++++++++++++++
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 172 ++-
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 12
9 files changed, 2,682 insertions(+), 169 deletions(-)
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
index 363c9a9..7e7f7cd 100644
--- a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
+++ b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
@@ -274,7 +274,7 @@
</adm:description>
<adm:default-behavior>
<adm:defined>
- <adm:value>1000ms</adm:value>
+ <adm:value>2000ms</adm:value>
</adm:defined>
</adm:default-behavior>
<adm:syntax>
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index b519ed2..64354b8 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -347,8 +347,8 @@
server %s will be aborted. Simultanate cross connection attempt ?
NOTICE_BAD_GENERATION_ID_FROM_DS_146=On suffix %s, directory server %s presented \
generation ID=%s when expected generation ID=%s
-SEVERE_ERR_DS_RECEIVED_ACK_ERROR_147=In replication service %s, the assured \
- update message %s was acknowledged with the following errors: %s
+NOTICE_DS_RECEIVED_ACK_ERROR_147=In replication service %s and server id %s, the \
+ assured update message %s was acknowledged with the following errors: %s
SEVERE_ERR_DS_ACK_TIMEOUT_148=In replication service %s, timeout after %s ms \
waiting for the acknowledgement of the assured update message: %s
SEVERE_ERR_DS_UNKNOWN_ASSURED_MODE_149=In directory server %s, received unknown \
diff --git a/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java b/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java
index 296e499..f81adda 100644
--- a/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java
+++ b/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java
@@ -27,6 +27,9 @@
package org.opends.server.replication.server;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.AckMsg;
@@ -66,18 +69,36 @@
private boolean completed = false;
/**
+ * This gives the list of servers we are willing to wait acks from and the
+ * information about the ack from the servers.
+ * key: the id of the server.
+ * value: a boolean true if we received the ack from the server,
+ * false otherwise.
+ */
+ protected Map<Short,Boolean> expectedServersAckStatus =
+ new HashMap<Short,Boolean>();
+
+ /**
* Creates a new ExpectedAcksInfo.
* @param changeNumber The change number of the assured update message
* @param requesterServerHandler The server handler of the server that sent
* the assured update message
* @param assuredMode The assured mode requested by the assured update message
+ * @param expectedServers The list of servers we want an ack from
*/
protected ExpectedAcksInfo(ChangeNumber changeNumber,
- ServerHandler requesterServerHandler, AssuredMode assuredMode)
+ ServerHandler requesterServerHandler, AssuredMode assuredMode,
+ List<Short> expectedServers)
{
this.requesterServerHandler = requesterServerHandler;
this.assuredMode = assuredMode;
this.changeNumber = changeNumber;
+
+ // Initialize list of servers we expect acks from
+ for (Short serverId : expectedServers)
+ {
+ expectedServersAckStatus.put(serverId, false);
+ }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index cf68944..9408c24 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -276,10 +276,10 @@
// According to assured sub-mode, prepare structures to keep track of
// the acks we are interested in.
AssuredMode assuredMode = update.getAssuredMode();
- if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+ if (assuredMode == AssuredMode.SAFE_DATA_MODE)
{
preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
- } else if (assuredMode != AssuredMode.SAFE_READ_MODE)
+ } else if (assuredMode == AssuredMode.SAFE_READ_MODE)
{
preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
} else
@@ -525,7 +525,15 @@
for (ServerHandler handler : replicationServers.values())
{
if (handler.getGroupId() == groupId)
- expectedServers.add(handler.getServerId());
+ // No ack expected from a RS with different group id
+ {
+ if ((generationId > 0) &&
+ (generationId == handler.getGenerationId()))
+ // No ack expected from a RS with bad gen id
+ {
+ expectedServers.add(handler.getServerId());
+ }
+ }
}
}
@@ -538,13 +546,29 @@
continue;
}
if (handler.getGroupId() == groupId)
+ // No ack expected from a DS with different group id
{
- if (handler.getStatus() == ServerStatus.NORMAL_STATUS)
+ ServerStatus serverStatus = handler.getStatus();
+ if (serverStatus == ServerStatus.NORMAL_STATUS)
{
expectedServers.add(handler.getServerId());
} else
+ // No ack expected from a DS with wrong status
{
- wrongStatusServers.add(handler.getServerId());
+ if (serverStatus == ServerStatus.DEGRADED_STATUS)
+ {
+ wrongStatusServers.add(handler.getServerId());
+ } else
+ {
+ /*
+ * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
+ * We do not want this to be reported as an error to the update
+ * maker -> no pollution or potential missunderstanding when
+ * reading logs or monitoring and it was just administration (for
+ * instance new server is being configured in topo: it goes in bad
+ * gen then then full full update).
+ */
+ }
}
}
}
@@ -589,7 +613,7 @@
UpdateMsg update, ServerHandler sourceHandler) throws IOException
{
ChangeNumber cn = update.getChangeNumber();
- boolean interestedInAcks = true;
+ boolean interestedInAcks = false;
byte safeDataLevel = update.getSafeDataLevel();
byte groupId = replicationServer.getGroupId();
byte sourceGroupId = sourceHandler.getGroupId();
@@ -600,47 +624,55 @@
Short.toString(replicationServer.getServerId()),
Byte.toString(safeDataLevel), baseDn, update.toString());
logError(errorMsg);
- interestedInAcks = false;
} else if (sourceGroupId != groupId)
{
// Assured feature does not cross different group ids
- interestedInAcks = false;
} else
{
- if (sourceHandler.isLDAPserver())
+ if ((generationId > 0) &&
+ (generationId == sourceHandler.getGenerationId()))
+ // Ignore assured updates from wrong generationid servers
{
- if (safeDataLevel == (byte) 1)
+ if (sourceHandler.isLDAPserver())
{
- // Immediatly return the ack for an assured message in safe data mode
- // with safe data level 1, coming from a DS. No need to wait for more
- // acks
- AckMsg ack = new AckMsg(cn);
- sourceHandler.sendAck(ack);
- interestedInAcks = false; // No further acks to obtain
+ if (safeDataLevel == (byte) 1)
+ {
+ // Immediatly return the ack for an assured message in safe data
+ // mode with safe data level 1, coming from a DS. No need to wait
+ // for more acks
+ AckMsg ack = new AckMsg(cn);
+ sourceHandler.sendAck(ack);
+ } else
+ {
+ if (safeDataLevel != (byte) 0)
+ {
+ // level > 1 : We need further acks
+ // The message will be posted in assured mode to elligible
+ // servers. The embedded safe data level is not changed, and his
+ // value will be used by a remote RS to determine if he must send
+ // an ack (level > 1) or not (level = 1)
+ interestedInAcks = true;
+ } else
+ {
+ // Should never happen
+ }
+ }
} else
- {
- // level > 1 : We need further acks
- // The message will be posted in assured mode to elligible servers.
- // The embedded safe data level is not changed, and his value will be
- // used by a remote RS to determine if he must send an ack (level > 1)
- // or not (level = 1)
- }
- } else
- { // A RS sent us the safe data message, for sure no futher acks to wait
- interestedInAcks = false;
- if (safeDataLevel == (byte) 1)
- {
- // The original level was 1 so the RS that sent us this message should
- // have already sent his ack to the sender DS. Level 1 has already
- // been reached so no further acks to wait
- // This should not happen in theory as the sender RS server should
- // have sent us a matching not assured message so we should not come
- // to here.
- } else
- {
- // level > 1, so Ack this message to originator RS
- AckMsg ack = new AckMsg(cn);
- sourceHandler.sendAck(ack);
+ { // A RS sent us the safe data message, for sure no futher acks to wait
+ if (safeDataLevel == (byte) 1)
+ {
+ // The original level was 1 so the RS that sent us this message
+ // should have already sent his ack to the sender DS. Level 1 has
+ // already been reached so no further acks to wait.
+ // This should not happen in theory as the sender RS server should
+ // have sent us a matching not assured message so we should not come
+ // to here.
+ } else
+ {
+ // level > 1, so Ack this message to originator RS
+ AckMsg ack = new AckMsg(cn);
+ sourceHandler.sendAck(ack);
+ }
}
}
}
@@ -654,39 +686,46 @@
for (ServerHandler handler : replicationServers.values())
{
if (handler.getGroupId() == groupId)
- expectedServers.add(handler.getServerId());
+ // No ack expected from a RS with different group id
+ {
+ if ((generationId > 0) &&
+ (generationId == handler.getGenerationId()))
+ // No ack expected from a RS with bad gen id
+ {
+ expectedServers.add(handler.getServerId());
+ }
+ }
}
}
-
- // Look for DS elligible for assured
- for (ServerHandler handler : directoryServers.values())
- {
- // Don't forward the change to the server that just sent it
- if (handler == sourceHandler)
- {
- continue;
- }
- if (handler.getGroupId() == groupId)
- expectedServers.add(handler.getServerId());
- }
}
// Return computed structures
PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
- if (interestedInAcks && (expectedServers.size() > 0))
+ int nExpectedServers = expectedServers.size();
+ if (interestedInAcks) // interestedInAcks so level > 1
{
- // Some other acks to wait for
- preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
- sourceHandler, update.getSafeDataLevel());
- preparedAssuredInfo.expectedServers = expectedServers;
- }
-
- if (interestedInAcks && (preparedAssuredInfo.expectedServers == null))
- {
- // level > 1 and source is a DS but no elligible servers found, send the
- // ack immediatly
- AckMsg ack = new AckMsg(cn);
- sourceHandler.sendAck(ack);
+ if (nExpectedServers > 0)
+ {
+ // Some other acks to wait for
+ int sdl = update.getSafeDataLevel();
+ int neededAdditionalServers = sdl - 1;
+ // Change the number of expected acks if not enough available elligible
+ // servers: the level is a best effort thing, we do not want to timeout
+ // at every assured SD update for instance if a RS has had his gen id
+ // resetted
+ byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ?
+ (byte)sdl : // Keep level as it was
+ (byte)(nExpectedServers+1)); // Change level to match what's available
+ preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
+ sourceHandler, finalSdl, expectedServers);
+ preparedAssuredInfo.expectedServers = expectedServers;
+ } else
+ {
+ // level > 1 and source is a DS but no elligible servers found, send the
+ // ack immediatly
+ AckMsg ack = new AckMsg(cn);
+ sourceHandler.sendAck(ack);
+ }
}
return preparedAssuredInfo;
@@ -1436,6 +1475,9 @@
*/
public void shutdown()
{
+ // Terminate the assured timer
+ assuredTimeoutTimer.cancel();
+
// Close session with other changelogs
for (ServerHandler serverHandler : replicationServers.values())
{
diff --git a/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java b/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
index e4ce2c4..da6a716 100644
--- a/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
+++ b/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
@@ -27,6 +27,9 @@
package org.opends.server.replication.server;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
@@ -61,11 +64,14 @@
* message
* @param safeDataLevel The Safe Data level requested for the assured
* update message
+ * @param expectedServers The list of servers we want an ack from
*/
public SafeDataExpectedAcksInfo(ChangeNumber changeNumber,
- ServerHandler requesterServerHandler, byte safeDataLevel)
+ ServerHandler requesterServerHandler, byte safeDataLevel,
+ List<Short> expectedServers)
{
- super(changeNumber, requesterServerHandler, AssuredMode.SAFE_DATA_MODE);
+ super(changeNumber, requesterServerHandler, AssuredMode.SAFE_DATA_MODE,
+ expectedServers);
this.safeDataLevel = safeDataLevel;
}
@@ -88,11 +94,27 @@
return false;
}
- numReceivedAcks++;
- if (numReceivedAcks == safeDataLevel)
- return true;
- else
+ // Get the ack status for the matching server
+ short ackingServerId = ackingServer.getServerId();
+ boolean ackReceived = expectedServersAckStatus.get(ackingServerId);
+ if (ackReceived)
+ {
+ // Sanity check: this should never happen
+ if (debugEnabled())
+ TRACER.debugInfo("Received unexpected ack from server id: " +
+ ackingServerId + " ack message: " + ackMsg);
return false;
+ } else
+ {
+
+ // Mark this ack received for the server
+ expectedServersAckStatus.put(ackingServerId, true);
+ numReceivedAcks++;
+ if (numReceivedAcks == safeDataLevel)
+ return true;
+ else
+ return false;
+ }
}
/**
@@ -103,7 +125,20 @@
AckMsg ack = new AckMsg(changeNumber);
if (timeout)
+ {
+ // Fill collected errors info
ack.setHasTimeout(true);
+ // Tell wich servers did not send an ack in time
+ List<Short> failedServers = new ArrayList<Short>();
+ Set<Short> serverIds = expectedServersAckStatus.keySet();
+ for (Short serverId : serverIds)
+ {
+ boolean ackReceived = expectedServersAckStatus.get(serverId);
+ if (!ackReceived)
+ failedServers.add(serverId);
+ }
+ ack.setFailedServers(failedServers);
+ }
return ack;
}
diff --git a/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java b/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
index 62fb049..e5cb702 100644
--- a/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
+++ b/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
@@ -30,9 +30,7 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.AckMsg;
@@ -65,18 +63,6 @@
private List<Short> failedServers = null;
/**
- * This gives the list of servers we are willing to wait acks from and the
- * information about the ack from the servers.
- * key: the id of the server.
- * value: a boolean true if we received the ack from the server,
- * false otherwise.
- * This must not include servers we already identified they are in wrong
- * status, but just servers that are in normal status.
- */
- private Map<Short,Boolean> expectedServersAckStatus =
- new HashMap<Short,Boolean>();
-
- /**
* Number of servers we want an ack from and from which we received the ack.
* Said differently: the number of servers in expectedServersAckStatus whose
* value is true. When this value reaches the size of expectedServersAckStatus
@@ -100,7 +86,8 @@
ServerHandler requesterServerHandler, List<Short> expectedServers,
List<Short> wrongStatusServers)
{
- super(changeNumber, requesterServerHandler, AssuredMode.SAFE_READ_MODE);
+ super(changeNumber, requesterServerHandler, AssuredMode.SAFE_READ_MODE,
+ expectedServers);
// Keep track of potential servers detected in wrong status
if (wrongStatusServers.size() > 0)
@@ -108,12 +95,6 @@
hasWrongStatus = true;
failedServers = wrongStatusServers;
}
-
- // Initialize list of servers we expect acks from
- for (Short serverId : expectedServers)
- {
- expectedServersAckStatus.put(serverId, false);
- }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 1301a82..fa95572 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -831,7 +831,7 @@
broker.getRsServerId());
break;
default:
- // Should no happen
+ // Should not happen
}
throw new TimeoutException("No ack received for message cn: " + cn +
@@ -914,8 +914,8 @@
{
// Some problems detected: message not correclty reached every requested
// servers. Log problem
- Message errorMsg = ERR_DS_RECEIVED_ACK_ERROR.get(serviceID,
- update.toString(), ack.errorsToString());
+ Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(serviceID,
+ Short.toString(serverID), update.toString(), ack.errorsToString());
logError(errorMsg);
List<Short> failedServers = ack.getFailedServers();
@@ -954,7 +954,7 @@
}
break;
default:
- // Should no happen
+ // Should not happen
}
} else
{
@@ -969,7 +969,7 @@
assuredSdAcknowledgedUpdates.incrementAndGet();
break;
default:
- // Should no happen
+ // Should not happen
}
}
}
@@ -2393,7 +2393,7 @@
assuredSdSentUpdates.incrementAndGet();
break;
default:
- // Should no happen
+ // Should not happen
}
// Now wait for ack matching the sent assured update
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index d643ab0..87284b3 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
@@ -30,6 +30,7 @@
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -37,6 +38,8 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.StringTokenizer;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
import org.opends.server.types.ResultCode;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -63,15 +66,19 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ByteStringFactory;
import org.testng.annotations.BeforeClass;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
+import org.opends.server.types.LockManager;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -79,6 +86,7 @@
import static org.testng.Assert.fail;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
@@ -120,6 +128,7 @@
private static final int NO_TIMEOUT_SCENARIO = 3;
private static final int SAFE_READ_MANY_ERRORS = 4;
private static final int SAFE_DATA_MANY_ERRORS = 5;
+ private static final int NO_READ = 6;
// The tracer object for the debug logger
private static final DebugTracer TRACER = getTracer();
@@ -288,7 +297,10 @@
/**
* The fake replication server used to emulate RS behaviour the way we want
- * for assured features test
+ * for assured features test.
+ * This fake replication server is able to receive a DS connection only.
+ * According to the configured scenario, it will answer to updates with acks
+ * as the scenario is requesting.
*/
private class FakeReplicationServer extends Thread
{
@@ -323,13 +335,16 @@
// where the main code can perform test assertion
private boolean scenarioExecuted = false;
+ private ChangeNumberGenerator gen = null;
+
// Constructor for RS receiving updates in SR assured mode or not assured
// The assured boolean means:
// - true: SR mode
// - false: not assured
- public FakeReplicationServer(int port, short serverId, boolean assured)
+ public FakeReplicationServer(byte groupId, int port, short serverId, boolean assured)
{
+ this.groupId = groupId;
this.port = port;
this.serverId = serverId;
@@ -341,9 +356,9 @@
}
// Constructor for RS receiving updates in SD assured mode
- public FakeReplicationServer(int port, short serverId, int safeDataLevel)
+ public FakeReplicationServer(byte groupId, int port, short serverId, int safeDataLevel)
{
-
+ this.groupId = groupId;
this.port = port;
this.serverId = serverId;
@@ -358,6 +373,8 @@
public void start(int scenario)
{
+ gen = new ChangeNumberGenerator((short)3, 0L);
+
// Store expected test case
this.scenario = scenario;
@@ -472,7 +489,6 @@
serverState = serverStartMsg.getServerState();
generationId = serverStartMsg.getGenerationId();
windowSize = serverStartMsg.getWindowSize();
- groupId = serverStartMsg.getGroupId();
sslEncryption = serverStartMsg.getSSLEncryption();
// Send replication server start
@@ -586,11 +602,62 @@
case SAFE_DATA_MANY_ERRORS:
executeSafeDataManyErrorsScenario();
break;
+ case NO_READ:
+ // Nothing to execute, just let session opne. This scenario used to
+ // send updates from the RS to the DS (reply test cases)
+ while (!shutdown)
+ {
+ try
+ {
+ sleep(5000);
+ } catch (InterruptedException ex)
+ {
+ // Going shutdown ?
+ break;
+ }
+ }
+ break;
default:
fail("Unknown scenario: " + scenario);
}
}
+ /*
+ * Make the RS send an add message with the passed entry and return the ack
+ * message it receives from the DS
+ */
+ private AckMsg sendAssuredAddMsg(Entry entry, String parentUid) throws SocketTimeoutException
+ {
+ try
+ {
+ // Create add message
+ AddMsg addMsg =
+ new AddMsg(gen.newChangeNumber(), entry.getDN().toString(), UUID.randomUUID().toString(),
+ parentUid,
+ entry.getObjectClassAttribute(),
+ entry.getAttributes(), null );
+
+ // Send add message in assured mode
+ addMsg.setAssured(isAssured);
+ addMsg.setAssuredMode(assuredMode);
+ addMsg.setSafeDataLevel(safeDataLevel);
+ session.publish(addMsg);
+
+ // Read and return matching ack
+ AckMsg ackMsg = (AckMsg)session.receive();
+ return ackMsg;
+
+ } catch(SocketTimeoutException e)
+ {
+ throw e;
+ } catch (Throwable t)
+ {
+ fail("Unexpected exception in fake replication server sendAddUpdate " +
+ "processing: " + t);
+ return null;
+ }
+ }
+
/**
* Read the coming update and check parameters are not assured
*/
@@ -800,11 +867,26 @@
}
/**
+ * Return various group id values
+ */
+ @DataProvider(name = "rsGroupIdProvider")
+ private Object[][] rsGroupIdProvider()
+ {
+ return new Object[][]
+ {
+ { (byte)1 },
+ { (byte)2 }
+ };
+ }
+
+ /**
* Tests that a DS performing a modification in safe data mode waits for
* the ack of the RS for the configured timeout time, then times out.
+ * If the RS group id is not the same as the DS one, this must not time out
+ * and return immediately.
*/
- @Test
- public void testSafeDataModeTimeout() throws Exception
+ @Test(dataProvider = "rsGroupIdProvider")
+ public void testSafeDataModeTimeout(byte rsGroupId) throws Exception
{
int TIMEOUT = 5000;
@@ -813,7 +895,7 @@
{
// Create and start a RS expecting clients in safe data assured mode with
// safe data level 2
- replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+ replicationServer = new FakeReplicationServer(rsGroupId, replServerPort, RS_SERVER_ID,
1);
replicationServer.start(TIMEOUT_SCENARIO);
@@ -830,34 +912,61 @@
"objectClass: organizationalUnit\n";
addEntry(TestCaseUtils.entryFromLdifString(entry));
- // In this scenario, the fake RS will not send back an ack so we expect
- // the add entry code (LDAP client code emulation) to be blocked for the
- // timeout value at least. If the time we have slept is lower, timeout
- // handling code is not working...
long endTime = System.currentTimeMillis();
- assertTrue((endTime - startTime) >= TIMEOUT);
- assertTrue(replicationServer.isScenarioExecuted());
- // Check monitoring values
- DN baseDn = DN.decode(SAFE_DATA_DN);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
- Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
- assertTrue(errorsByServer.isEmpty());
- assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
- errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
- // errors by server list for sd mode should be [[rsId:1]]
- assertEquals(errorsByServer.size(), 1);
- Integer nError = errorsByServer.get((short)RS_SERVER_ID);
- assertNotNull(nError);
- assertEquals(nError.intValue(), 1);
+ if (rsGroupId == (byte)1)
+ {
+ // RS has same group id as DS
+ // In this scenario, the fake RS will not send back an ack so we expect
+ // the add entry code (LDAP client code emulation) to be blocked for the
+ // timeout value at least. If the time we have slept is lower, timeout
+ // handling code is not working...
+ assertTrue((endTime - startTime) >= TIMEOUT);
+ assertTrue(replicationServer.isScenarioExecuted());
+ // Check monitoring values
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ DN baseDn = DN.decode(SAFE_DATA_DN);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
+ Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
+ assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
+ errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
+ // errors by server list for sd mode should be [[rsId:1]]
+ assertEquals(errorsByServer.size(), 1);
+ Integer nError = errorsByServer.get((short)RS_SERVER_ID);
+ assertNotNull(nError);
+ assertEquals(nError.intValue(), 1);
+ } else
+ {
+ // RS has a different group id, addEntry should have returned quickly
+ assertTrue((endTime - startTime) < 3000);
+
+ // No error should be seen in monitoring and update should have not been
+ // sent in assured mode
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ DN baseDn = DN.decode(SAFE_DATA_DN);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
+ Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
+ assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
+ errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
+ assertTrue(errorsByServer.isEmpty());
+ }
} finally
{
endTest();
@@ -867,9 +976,11 @@
/**
* Tests that a DS performing a modification in safe read mode waits for
* the ack of the RS for the configured timeout time, then times out.
+ * If the RS group id is not the same as the DS one, this must not time out
+ * and return immediately.
*/
- @Test
- public void testSafeReadModeTimeout() throws Exception
+ @Test(dataProvider = "rsGroupIdProvider")
+ public void testSafeReadModeTimeout(byte rsGroupId) throws Exception
{
int TIMEOUT = 5000;
@@ -877,7 +988,7 @@
try
{
// Create and start a RS expecting clients in safe read assured mode
- replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+ replicationServer = new FakeReplicationServer(rsGroupId, replServerPort, RS_SERVER_ID,
true);
replicationServer.start(TIMEOUT_SCENARIO);
@@ -894,34 +1005,61 @@
"objectClass: organizationalUnit\n";
addEntry(TestCaseUtils.entryFromLdifString(entry));
- // In this scenario, the fake RS will not send back an ack so we expect
- // the add entry code (LDAP client code emulation) to be blocked for the
- // timeout value at least. If the time we have slept is lower, timeout
- // handling code is not working...
long endTime = System.currentTimeMillis();
- assertTrue((endTime - startTime) >= TIMEOUT);
- assertTrue(replicationServer.isScenarioExecuted());
- // Check monitoring values
- DN baseDn = DN.decode(SAFE_READ_DN);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 1);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 1);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
- Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
- // errors by server list for sr mode should be [[rsId:1]]
- assertEquals(errorsByServer.size(), 1);
- Integer nError = errorsByServer.get((short)RS_SERVER_ID);
- assertNotNull(nError);
- assertEquals(nError.intValue(), 1);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
- assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
- errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
- assertTrue(errorsByServer.isEmpty());
+ if (rsGroupId == (byte)1)
+ {
+ // RS has same group id as DS
+ // In this scenario, the fake RS will not send back an ack so we expect
+ // the add entry code (LDAP client code emulation) to be blocked for the
+ // timeout value at least. If the time we have slept is lower, timeout
+ // handling code is not working...
+ assertTrue((endTime - startTime) >= TIMEOUT);
+ assertTrue(replicationServer.isScenarioExecuted());
+ // Check monitoring values
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ DN baseDn = DN.decode(SAFE_READ_DN);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
+ Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
+ // errors by server list for sr mode should be [[rsId:1]]
+ assertEquals(errorsByServer.size(), 1);
+ Integer nError = errorsByServer.get((short)RS_SERVER_ID);
+ assertNotNull(nError);
+ assertEquals(nError.intValue(), 1);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
+ errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
+ assertTrue(errorsByServer.isEmpty());
+ } else
+ {
+ // RS has a different group id, addEntry should have returned quickly
+ assertTrue((endTime - startTime) < 3000);
+
+ // No error should be seen in monitoring and update should have not been
+ // sent in assured mode
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ DN baseDn = DN.decode(SAFE_READ_DN);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
+ Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
+ assertTrue(errorsByServer.isEmpty());
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
+ assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
+ errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
+ assertTrue(errorsByServer.isEmpty());
+ }
} finally
{
endTest();
@@ -940,7 +1078,7 @@
try
{
// Create and start a RS expecting not assured clients
- replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+ replicationServer = new FakeReplicationServer((byte)1, replServerPort, RS_SERVER_ID,
false);
replicationServer.start(NOT_ASSURED_SCENARIO);
@@ -1034,7 +1172,7 @@
{
// Create and start a RS expecting clients in safe data assured mode with
// safe data level 2
- replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+ replicationServer = new FakeReplicationServer((byte)1, replServerPort, RS_SERVER_ID,
2);
replicationServer.start(NO_TIMEOUT_SCENARIO);
@@ -1060,6 +1198,7 @@
assertTrue(replicationServer.isScenarioExecuted());
// Check monitoring values
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDn = DN.decode(SAFE_DATA_DN);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
@@ -1094,7 +1233,7 @@
try
{
// Create and start a RS expecting clients in safe read assured mode
- replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+ replicationServer = new FakeReplicationServer((byte)1, replServerPort, RS_SERVER_ID,
true);
replicationServer.start(NO_TIMEOUT_SCENARIO);
@@ -1120,6 +1259,7 @@
assertTrue(replicationServer.isScenarioExecuted());
// Check monitoring values
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDn = DN.decode(SAFE_READ_DN);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 1);
@@ -1142,6 +1282,195 @@
}
/**
+ * Get the entryUUID for a given DN.
+ *
+ * @throws Exception if the entry does not exist or does not have
+ * an entryUUID.
+ */
+ private String getEntryUUID(DN dn) throws Exception
+ {
+ Entry newEntry;
+ int count = 10;
+ if (count<1)
+ count=1;
+ String found = null;
+ while ((count> 0) && (found == null))
+ {
+ Thread.sleep(100);
+
+ Lock lock = null;
+ for (int i=0; i < 3; i++)
+ {
+ lock = LockManager.lockRead(dn);
+ if (lock != null)
+ {
+ break;
+ }
+ }
+
+ if (lock == null)
+ {
+ throw new Exception("could not lock entry " + dn);
+ }
+
+ try
+ {
+ newEntry = DirectoryServer.getEntry(dn);
+
+ if (newEntry != null)
+ {
+ List<Attribute> tmpAttrList = newEntry.getAttribute("entryuuid");
+ Attribute tmpAttr = tmpAttrList.get(0);
+
+ for (AttributeValue val : tmpAttr)
+ {
+ found = val.getStringValue();
+ break;
+ }
+ }
+ }
+ finally
+ {
+ LockManager.unlock(dn, lock);
+ }
+ count --;
+ }
+ if (found == null)
+ throw new Exception("Entry: " + dn + " Could not be found.");
+ return found;
+ }
+
+ /**
+ * Tests that a DS receiving an update from a RS in safe read mode effectively
+ * sends an ack back (with or without error)
+ */
+ @Test(dataProvider = "rsGroupIdProvider")
+ public void testSafeReadModeReply(byte rsGroupId) throws Exception
+ {
+
+ int TIMEOUT = 5000;
+ String testcase = "testSafeReadModeReply";
+ try
+ {
+ // Create and start a RS expecting clients in safe read assured mode
+ replicationServer = new FakeReplicationServer(rsGroupId, replServerPort, RS_SERVER_ID,
+ true);
+ replicationServer.start(NO_READ);
+
+ // Create a safe read assured domain
+ safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
+ TIMEOUT);
+ // Wait for connection of domain to RS
+ waitForConnectionToRs(testcase, replicationServer);
+
+ /*
+ * Send an update from the RS and get the ack
+ */
+
+ // Make the RS send an assured add message
+ String entryStr = "dn: ou=assured-sr-reply-entry," + SAFE_READ_DN + "\n" +
+ "objectClass: top\n" +
+ "objectClass: organizationalUnit\n";
+ Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
+ String parentUid = getEntryUUID(DN.decode(SAFE_READ_DN));
+ AckMsg ackMsg = null;
+
+ try {
+ ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
+
+ if (rsGroupId == (byte)2)
+ fail("Should only go here for RS with same group id as DS");
+
+ // Ack received, replay has occurred
+ assertNotNull(DirectoryServer.getEntry(entry.getDN()));
+
+ // Check that DS replied an ack without errors anyway
+ assertFalse(ackMsg.hasTimeout());
+ assertFalse(ackMsg.hasReplayError());
+ assertFalse(ackMsg.hasWrongStatus());
+ assertEquals(ackMsg.getFailedServers().size(), 0);
+ } catch (SocketTimeoutException e)
+ {
+ // Expected
+ if (rsGroupId == (byte)1)
+ fail("Should only go here for RS with group id different from DS one");
+
+ return;
+ }
+
+ /*
+ * Send un update with error from the RS and get the ack with error
+ */
+
+ // Make the RS send a not possible assured add message
+
+ // TODO: make the domain return an error: use a plugin ?
+ // The resolution code does not generate any error so we need to find a
+ // way to have the replay not working to test this...
+
+ // Check that DS replied an ack with errors
+// assertFalse(ackMsg.hasTimeout());
+// assertTrue(ackMsg.hasReplayError());
+// assertFalse(ackMsg.hasWrongStatus());
+// List<Short> failedServers = ackMsg.getFailedServers();
+// assertEquals(failedServers.size(), 1);
+// assertEquals((short)failedServers.get(0), (short)1);
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ /**
+ * Tests that a DS receiving an update from a RS in safe data mode does not
+ * send back and ack (only safe read is taken into account in DS replay)
+ */
+ @Test(dataProvider = "rsGroupIdProvider")
+ public void testSafeDataModeReply(byte rsGroupId) throws Exception
+ {
+
+ int TIMEOUT = 5000;
+ String testcase = "testSafeDataModeReply";
+ try
+ {
+ // Create and start a RS expecting clients in safe data assured mode
+ replicationServer = new FakeReplicationServer(rsGroupId, replServerPort, RS_SERVER_ID,
+ 4);
+ replicationServer.start(NO_READ);
+
+ // Create a safe data assured domain
+ safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 4,
+ TIMEOUT);
+ // Wait for connection of domain to RS
+ waitForConnectionToRs(testcase, replicationServer);
+
+ // Make the RS send an assured add message: we expect a read timeout as
+ // safe data should be ignored by DS
+ String entryStr = "dn: ou=assured-sd-reply-entry," + SAFE_DATA_DN + "\n" +
+ "objectClass: top\n" +
+ "objectClass: organizationalUnit\n";
+ Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
+ String parentUid = getEntryUUID(DN.decode(SAFE_DATA_DN));
+
+ AckMsg ackMsg = null;
+ try
+ {
+ ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
+ } catch (SocketTimeoutException e)
+ {
+ // Expected
+ return;
+ }
+
+ fail("DS should not reply an ack in safe data mode, however, it replied: " +
+ ackMsg);
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ /**
* DS performs many successive modifications in safe data mode and receives RS
* acks with various errors. Check for monitoring right errors
*/
@@ -1155,7 +1484,7 @@
{
// Create and start a RS expecting clients in safe data assured mode with
// safe data level 3
- replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+ replicationServer = new FakeReplicationServer((byte)1, replServerPort, RS_SERVER_ID,
3);
replicationServer.start(SAFE_DATA_MANY_ERRORS);
@@ -1184,6 +1513,7 @@
// The expected ack for the first update is:
// - timeout error
// - server 10 error
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDn = DN.decode(SAFE_DATA_DN);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
@@ -1218,6 +1548,7 @@
// The expected ack for the second update is:
// - timeout error
// - server 10 error, server 20 error
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
baseDn = DN.decode(SAFE_DATA_DN);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
@@ -1254,6 +1585,7 @@
// Check monitoring values
// No ack should have comen back, so timeout incremented (flag and error for rs)
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
baseDn = DN.decode(SAFE_DATA_DN);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
@@ -1298,7 +1630,7 @@
try
{
// Create and start a RS expecting clients in safe read assured mode
- replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+ replicationServer = new FakeReplicationServer((byte)1, replServerPort, RS_SERVER_ID,
true);
replicationServer.start(SAFE_READ_MANY_ERRORS);
@@ -1327,6 +1659,7 @@
// The expected ack for the first update is:
// - replay error
// - server 10 error, server 20 error
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDn = DN.decode(SAFE_READ_DN);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
@@ -1366,6 +1699,7 @@
// - wrong status error
// - replay error
// - server 10 error, server 20 error, server 30 error
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 2);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 2);
@@ -1404,6 +1738,7 @@
// Check monitoring values
// No ack should have comen back, so timeout incremented (flag and error for rs)
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 3);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 3);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
new file mode 100644
index 0000000..d08f668
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -0,0 +1,2099 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.config.ConfigException;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.AssuredMode;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.service.ReplicationDomain;
+import org.opends.server.types.DirectoryException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.fail;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertEquals;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
+/**
+ * Test Server part of the assured feature in both safe data and
+ * safe read modes.
+ */
+public class AssuredReplicationServerTest
+ extends ReplicationTestCase
+{
+
+ private String testName = this.getClass().getSimpleName();
+ // The tracer object for the debug logger
+ private static final DebugTracer TRACER = getTracer();
+ private int rs1Port = -1;
+ private int rs2Port = -1;
+ private int rs3Port = -1;
+ private static final short FDS1_ID = 1;
+ private static final short FDS2_ID = 2;
+ private static final short FDS3_ID = 3;
+ private static final short FRS1_ID = 11;
+ private static final short FRS2_ID = 12;
+ private static final short FRS3_ID = 13;
+ private static final short RS1_ID = 101;
+ private static final short RS2_ID = 102;
+ private static final short RS3_ID = 103;
+ private FakeReplicationDomain fakeRd1 = null;
+ private FakeReplicationDomain fakeRd2 = null;
+ private FakeReplicationDomain fakeRd3 = null;
+ private FakeReplicationServer fakeRs1 = null;
+ private FakeReplicationServer fakeRs2 = null;
+ private FakeReplicationServer fakeRs3 = null;
+ private ReplicationServer rs1 = null;
+ private ReplicationServer rs2 = null;
+ private ReplicationServer rs3 = null;
+
+ // Small assured timeout value (timeout to be used in first RS receiving an
+ // assured update from a DS)
+ private static final int SMALL_TIMEOUT = 3000;
+ // Long assured timeout value (timeout to use in DS when sending an assured
+ // update)
+ private static final int LONG_TIMEOUT = 5000;
+ // Expected max time for sending an assured update and receive its ack
+ // (without errors)
+ private static final int MAX_SEND_UPDATE_TIME = 2000;
+
+ // Default group id
+ private static final int DEFAULT_GID = 1;
+ // Other group id
+ private static final int OTHER_GID = 2;
+
+ // Default generation id
+ private static long DEFAULT_GENID = EMPTY_DN_GENID;
+ // Other generation id
+ private static long OTHER_GENID = 500L;
+
+ /*
+ * Definitions for the scenario of the fake DS
+ */
+ // DS receives updates and replies acks with no errors to every updates
+ private static final int REPLY_OK_DS_SCENARIO = 1;
+ // DS receives acks but does not respond (makes timeouts)
+ private static final int TIMEOUT_DS_SCENARIO = 2;
+ // DS receives updates and replies ack with replay error flags
+ private static final int REPLAY_ERROR_DS_SCENARIO = 3;
+
+ /*
+ * Definitions for the scenario of the fake RS
+ */
+ // RS receives updates and replies acks with no errors to every updates
+ private static final int REPLY_OK_RS_SCENARIO = 11;
+ // RS receives acks but does not respond (makes timeouts)
+ private static final int TIMEOUT_RS_SCENARIO = 12;
+ // RS is used for sending updates (with sendNewFakeUpdate()) and receive acks, synchronously
+ private static final int SENDER_RS_SCENARIO = 13;
+
+ private void debugInfo(String s)
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("** TEST **" + s);
+ }
+ }
+
+ /**
+ * Before starting the tests configure some stuff
+ */
+ @BeforeClass
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Find a free port for the replication servers
+ ServerSocket socket1 = TestCaseUtils.bindFreePort();
+ ServerSocket socket2 = TestCaseUtils.bindFreePort();
+ ServerSocket socket3 = TestCaseUtils.bindFreePort();
+ rs1Port = socket1.getLocalPort();
+ rs2Port = socket2.getLocalPort();
+ rs3Port = socket3.getLocalPort();
+ socket1.close();
+ socket2.close();
+ socket3.close();
+ }
+
+ private void initTest()
+ {
+ fakeRd1 = null;
+ fakeRd2 = null;
+ fakeRd3 = null;
+ fakeRs1 = null;
+ fakeRs2 = null;
+ fakeRs3 = null;
+ rs1 = null;
+ rs2 = null;
+ rs3 = null;
+ }
+
+ private void endTest()
+ {
+ // Shutdown fake DSs
+
+ if (fakeRd1 != null)
+ {
+ fakeRd1.disableService();
+ fakeRd1 = null;
+ }
+
+ if (fakeRd2 != null)
+ {
+ fakeRd2.disableService();
+ fakeRd2 = null;
+ }
+
+ if (fakeRd3 != null)
+ {
+ fakeRd3.disableService();
+ fakeRd3 = null;
+ }
+
+ // Shutdown fake RSs
+
+ if (fakeRs1 != null)
+ {
+ fakeRs1.shutdown();
+ fakeRs1 = null;
+ }
+
+ if (fakeRs2 != null)
+ {
+ fakeRs2.shutdown();
+ fakeRs2 = null;
+ }
+
+ if (fakeRs3 != null)
+ {
+ fakeRs3.shutdown();
+ fakeRs3 = null;
+ }
+
+ // Shutdown RSs
+
+ if (rs1 != null)
+ {
+ rs1.clearDb();
+ rs1.remove();
+ rs1 = null;
+ }
+
+ if (rs2 != null)
+ {
+ rs2.clearDb();
+ rs2.remove();
+ rs2 = null;
+ }
+ if (rs3 != null)
+ {
+ rs3.clearDb();
+ rs3.remove();
+ rs3 = null;
+ }
+ }
+
+ /**
+ * Creates and connects a new fake replication domain, using the passed scenario.
+ */
+ private FakeReplicationDomain createFakeReplicationDomain(short serverId,
+ int groupId, short rsId, long generationId, boolean assured,
+ AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
+ int scenario)
+ {
+ try
+ {
+ // Set port to right real RS according to its id
+ int rsPort = -1;
+ switch (rsId)
+ {
+ case RS1_ID:
+ rsPort = rs1Port;
+ break;
+ case RS2_ID:
+ rsPort = rs2Port;
+ break;
+ case RS3_ID:
+ rsPort = rs3Port;
+ break;
+ default:
+ fail("Unknown RS id: " + rsId);
+ }
+
+ FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain(
+ TEST_ROOT_DN_STRING, serverId, "localhost:" + rsPort, generationId,
+ (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout, scenario);
+
+ // Test connection
+ assertTrue(fakeReplicationDomain.isConnected());
+ int rdPort = -1;
+ // Check connected server port
+ String serverStr = fakeReplicationDomain.getReplicationServer();
+ int index = serverStr.lastIndexOf(':');
+ if ((index == -1) || (index >= serverStr.length()))
+ fail("Enable to find port number in: " + serverStr);
+ String rdPortStr = serverStr.substring(index + 1);
+ try
+ {
+ rdPort = (new Integer(rdPortStr)).intValue();
+ } catch (Exception e)
+ {
+ fail("Enable to get an int from: " + rdPortStr);
+ }
+ assertEquals(rdPort, rsPort);
+
+ return fakeReplicationDomain;
+ } catch (Exception e)
+ {
+ fail("createFakeReplicationDomain " + e.getMessage());
+ }
+ return null;
+ }
+
+ /**
+ * Creates and connects a new fake replication server, using the passed scenario.
+ */
+ private FakeReplicationServer createFakeReplicationServer(short serverId,
+ int groupId, short rsId, long generationId, boolean assured,
+ AssuredMode assuredMode, int safeDataLevel, int scenario)
+ {
+ try
+ {
+ // Set port to right real RS according to its id
+ int rsPort = -1;
+ switch (rsId)
+ {
+ case RS1_ID:
+ rsPort = rs1Port;
+ break;
+ case RS2_ID:
+ rsPort = rs2Port;
+ break;
+ case RS3_ID:
+ rsPort = rs3Port;
+ break;
+ default:
+ fail("Unknown RS id: " + rsId);
+ }
+
+ FakeReplicationServer fakeReplicationServer = new FakeReplicationServer(
+ rsPort, serverId, assured, assuredMode, (byte)safeDataLevel, (byte)groupId,
+ TEST_ROOT_DN_STRING, generationId);
+
+ // Connect fake RS to the real RS
+ assertTrue(fakeReplicationServer.connect());
+
+ // Start wished scenario
+ fakeReplicationServer.start(scenario);
+
+ return fakeReplicationServer;
+ } catch (Exception e)
+ {
+ fail("createFakeReplicationServer " + e.getMessage());
+ }
+ return null;
+ }
+
+ /**
+ * Creates a new real replication server (one which is to be tested).
+ */
+ private ReplicationServer createReplicationServer(short serverId,
+ int groupId, long assuredTimeout, String testCase)
+ {
+ SortedSet<String> replServers = new TreeSet<String>();
+ try
+ {
+ int port = -1;
+ if (serverId == RS1_ID)
+ {
+ port = rs1Port;
+ if (testCase.equals("testSafeDataManyRealRSs"))
+ {
+ // Every 3 RSs connected together
+ replServers.add("localhost:" + rs2Port);
+ replServers.add("localhost:" + rs3Port);
+ } else
+ {
+ // Let this server alone
+ }
+ } else if (serverId == RS2_ID)
+ {
+ port = rs2Port;
+ if (testCase.equals("testSafeDataManyRealRSs"))
+ {
+ // Every 3 RSs connected together
+ replServers.add("localhost:" + rs1Port);
+ replServers.add("localhost:" + rs3Port);
+ } else
+ {
+ // Let this server alone
+ }
+ } else if (serverId == RS3_ID)
+ {
+ port = rs3Port;
+ if (testCase.equals("testSafeDataManyRealRSs"))
+ {
+ // Every 3 RSs connected together
+ replServers.add("localhost:" + rs1Port);
+ replServers.add("localhost:" + rs2Port);
+ } else
+ {
+ // Let this server alone
+ }
+ } else
+ {
+ fail("Unknown replication server id.");
+ }
+
+ String dir = testName + serverId + testCase + "Db";
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
+ replServers, groupId, assuredTimeout, 5000);
+ ReplicationServer replicationServer = new ReplicationServer(conf);
+ return replicationServer;
+
+ } catch (Exception e)
+ {
+ fail("createReplicationServer " + e.getMessage());
+ }
+ return null;
+ }
+
+ /**
+ * Fake replication domain implementation to test the replication server
+ * regarding the assured feature.
+ * According to the configured scenario, it will answer to updates with acks
+ * as the scenario is requesting.
+ */
+ public class FakeReplicationDomain extends ReplicationDomain
+ {
+ // The scenario this DS is expecting
+
+ private int scenario = -1;
+ private long generationId = -1;
+ private ProtocolSession session = null;
+
+ private ChangeNumberGenerator gen = null;
+
+ // False if a received update had assured parameters not as expected
+ private boolean everyUpdatesAreOk = true;
+ // Number of received updates
+ private int nReceivedUpdates = 0;
+
+ /**
+ * Creates a fake replication domain (DS)
+ * @param serviceID The base dn used at connection to RS
+ * @param serverID our server id
+ * @param replicationServer the URS of the RS we will connect to
+ * @param generationId the generation id we use at connection to real RS
+ * @param groupId our group id
+ * @param assured do we expect incoming assured updates (also used for outgoing updates)
+ * @param assuredMode the expected assured mode of the incoming updates (also used for outgoing updates)
+ * @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates)
+ * @param assuredTimeout the assured timeout used when sending updates
+ * @param scenario the scenario we are creating for (implies particular
+ * behaviour upon reception of updates)
+ * @throws org.opends.server.config.ConfigException
+ */
+ public FakeReplicationDomain(
+ String serviceID,
+ short serverID,
+ String replicationServer,
+ long generationId,
+ byte groupId,
+ boolean assured,
+ AssuredMode assuredMode,
+ byte safeDataLevel,
+ long assuredTimeout,
+ int scenario) throws ConfigException
+ {
+ super(serviceID, serverID);
+ List<String> replicationServers = new ArrayList<String>();
+ replicationServers.add(replicationServer);
+ this.generationId = generationId;
+ setGroupId(groupId);
+ setAssured(assured);
+ setAssuredMode(assuredMode);
+ setAssuredSdLevel(safeDataLevel);
+ setAssuredTimeout(assuredTimeout);
+ this.scenario = scenario;
+
+ gen = new ChangeNumberGenerator(serverID, 0L);
+
+ startPublishService(replicationServers, 100, 1000);
+ startListenService();
+ }
+
+ public boolean receivedUpdatesOk()
+ {
+ return everyUpdatesAreOk;
+ }
+
+ public int nReceivedUpdates()
+ {
+ return nReceivedUpdates;
+ }
+
+ /**
+ * To get the session reference to be able to send our own acks
+ */
+ @Override
+ public void sessionInitiated(
+ ServerStatus initStatus,
+ ServerState replicationServerState,
+ long generationId,
+ ProtocolSession session)
+ {
+ super.sessionInitiated(initStatus, replicationServerState, generationId, session);
+ this.session = session;
+ }
+
+ @Override
+ public long countEntries() throws DirectoryException
+ {
+ // Not needed for this test
+ return -1;
+ }
+
+ @Override
+ protected void exportBackend(OutputStream output) throws DirectoryException
+ {
+ // Not needed for this test
+ }
+
+ @Override
+ public long getGenerationID()
+ {
+ return generationId;
+ }
+
+ @Override
+ protected void importBackend(InputStream input) throws DirectoryException
+ {
+ // Not needed for this test
+ }
+
+ @Override
+ public boolean processUpdate(UpdateMsg updateMsg)
+ {
+ try
+ {
+ checkUpdateAssuredParameters(updateMsg);
+ nReceivedUpdates++;
+
+ // Now execute the requested scenario
+ AckMsg ackMsg = null;
+ switch (scenario)
+ {
+ case REPLY_OK_DS_SCENARIO:
+ // Send the ack without errors
+ ackMsg = new AckMsg(updateMsg.getChangeNumber());
+ session.publish(ackMsg);
+ break;
+ case TIMEOUT_DS_SCENARIO:
+ // Let timeout occur
+ break;
+ case REPLAY_ERROR_DS_SCENARIO:
+ // Send the ack with replay error
+ ackMsg = new AckMsg(updateMsg.getChangeNumber());
+ ackMsg.setHasReplayError(true);
+ List<Short> failedServers = new ArrayList<Short>();
+ failedServers.add(getServerId());
+ ackMsg.setFailedServers(failedServers);
+ session.publish(ackMsg);
+ break;
+ default:
+ fail("Unknown scenario: " + scenario);
+ }
+ return true;
+ } catch (IOException ex)
+ {
+ fail("IOException in fake replication domain " + getServerId() + " :" +
+ ex.getMessage());
+ return false;
+ }
+ }
+
+ /**
+ * Check that received update assured parameters are as defined at DS start
+ */
+ private void checkUpdateAssuredParameters(UpdateMsg updateMsg)
+ {
+ boolean ok = true;
+ if (updateMsg.isAssured() != isAssured())
+ {
+ debugInfo("Fake DS " + getServerId() + " received update assured flag is wrong: " + updateMsg);
+ ok = false;
+ }
+ if (updateMsg.getAssuredMode() != getAssuredMode())
+ {
+ debugInfo("Fake DS " + getServerId() + " received update assured mode is wrong: " + updateMsg);
+ ok = false;
+ }
+ if (updateMsg.getSafeDataLevel() != getAssuredSdLevel())
+ {
+ debugInfo("Fake DS " + getServerId() + " received update assured sd level is wrong: " + updateMsg);
+ ok = false;
+ }
+
+ if (ok)
+ debugInfo("Fake DS " + getServerId() + " received update assured parameters are ok: " + updateMsg);
+ else
+ everyUpdatesAreOk = false;
+ }
+
+ /**
+ * Sends a new update from this DS
+ * @throws TimeoutException If timeout waiting for an assured ack
+ */
+ public void sendNewFakeUpdate() throws TimeoutException
+ {
+
+ // Create a new delete update message (the simplest to create)
+ DeleteMsg delMsg = new DeleteMsg(getServiceID(), gen.newChangeNumber(),
+ UUID.randomUUID().toString());
+
+ // Send it (this uses the defined assured conf at constructor time)
+ publish(delMsg);
+ }
+ }
+
+ /**
+ * The fake replication server used to emulate RS behaviour the way we want
+ * for assured features test.
+ * This fake replication server is able to receive another RS connection only.
+ * According to the configured scenario, it will answer to updates with acks
+ * as the scenario is requesting.
+ */
+ private static int fakePort = 0;
+
+ private class FakeReplicationServer extends Thread
+ {
+
+ private boolean shutdown = false;
+ private ProtocolSession session = null;
+
+ // Parameters given at constructor time
+ private int port;
+ private short serverId = -1;
+ boolean isAssured = false; // Default value for config
+ AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; // Default value for config
+ byte safeDataLevel = (byte) 1; // Default value for config
+ private String baseDn = null;
+ private long generationId = -1L;
+ private byte groupId = (byte) -1;
+ private boolean sslEncryption = false;
+ // The scenario this RS is expecting
+ private int scenario = -1;
+
+ private ChangeNumberGenerator gen = null;
+
+ // False if a received update had assured parameters not as expected
+ private boolean everyUpdatesAreOk = true;
+ // Number of received updates
+ private int nReceivedUpdates = 0;
+
+ // True is an ack has been replied to a received assured update (in assured mode of course)
+ // used in reply scenario
+ private boolean ackReplied = false;
+
+ /**
+ * Creates a fake replication server
+ * @param port port of the real RS we will connect to
+ * @param serverId our server id
+ * @param assured do we expect incoming assured updates (also used for outgoing updates)
+ * @param assuredMode the expected assured mode of the incoming updates (also used for outgoing updates)
+ * @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates)
+ * @param groupId our group id
+ * @param baseDn the basedn we connect with, to the real RS
+ * @param generationId the generation id we use at connection to real RS
+ */
+ public FakeReplicationServer(int port, short serverId, boolean assured,
+ AssuredMode assuredMode, int safeDataLevel,
+ byte groupId, String baseDn, long generationId)
+ {
+ this.port = port;
+ this.serverId = serverId;
+ this.baseDn = baseDn;
+ this.generationId = generationId;
+ this.groupId = groupId;
+ this.isAssured = assured;
+ this.assuredMode = assuredMode;
+ this.safeDataLevel = (byte) safeDataLevel;
+
+ gen = new ChangeNumberGenerator((short)(serverId + 10), 0L);
+ }
+
+ /*
+ * Make the RS send an assured message and return the ack
+ * message it receives from the RS
+ */
+ public AckMsg sendNewFakeUpdate() throws SocketTimeoutException
+ {
+ try
+ {
+
+ // Create a new delete update message (the simplest to create)
+ DeleteMsg delMsg = new DeleteMsg(baseDn, gen.newChangeNumber(),
+ UUID.randomUUID().toString());
+
+ // Send del message in assured mode
+ delMsg.setAssured(isAssured);
+ delMsg.setAssuredMode(assuredMode);
+ delMsg.setSafeDataLevel(safeDataLevel);
+ session.publish(delMsg);
+
+ // Read and return matching ack
+ AckMsg ackMsg = null;
+ ReplicationMsg replMsg = session.receive();
+ if (replMsg instanceof ErrorMsg)
+ {
+ // Support for connection done with bad gen id : we receive an error
+ // message that we must throw away before reading our ack.
+ replMsg = session.receive();
+ }
+ ackMsg = (AckMsg)replMsg;
+
+ return ackMsg;
+
+ } catch(SocketTimeoutException e)
+ {
+ throw e;
+ } catch (Throwable t)
+ {
+ fail("Unexpected exception in fake replication server sendNewFakeUpdate " +
+ "processing: " + t);
+ return null;
+ }
+ }
+
+ /**
+ * Connect to RS
+ * Returns true if connection was made successfuly
+ */
+ public boolean connect()
+ {
+ try
+ {
+ // Create and connect socket
+ InetSocketAddress serverAddr =
+ new InetSocketAddress("localhost", port);
+ Socket socket = new Socket();
+ socket.setTcpNoDelay(true);
+ socket.connect(serverAddr, 500);
+
+ // Create client session
+ fakePort++;
+ String fakeUrl = "localhost:" + fakePort;
+ ReplSessionSecurity replSessionSecurity = new ReplSessionSecurity();
+ session = replSessionSecurity.createClientSession(fakeUrl, socket,
+ ReplSessionSecurity.HANDSHAKE_TIMEOUT);
+
+ // Send our repl server start msg
+ ReplServerStartMsg replServerStartMsg = new ReplServerStartMsg(serverId,
+ fakeUrl, baseDn, 100, new ServerState(),
+ ProtocolVersion.getCurrentVersion(), generationId, sslEncryption,
+ groupId, 5000);
+ session.publish(replServerStartMsg);
+
+ // Read repl server start msg
+ ReplServerStartMsg inReplServerStartMsg = (ReplServerStartMsg) session.
+ receive();
+
+ sslEncryption = inReplServerStartMsg.getSSLEncryption();
+ if (!sslEncryption)
+ {
+ session.stopEncryption();
+ }
+
+ // Send our topo mesg
+ RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
+ List<RSInfo> rsInfos = new ArrayList<RSInfo>();
+ rsInfos.add(rsInfo);
+ TopologyMsg topoMsg = new TopologyMsg(null, rsInfos);
+ session.publish(topoMsg);
+
+ // Read topo msg
+ TopologyMsg inTopoMsg = (TopologyMsg) session.receive();
+ debugInfo("Fake RS " + serverId + " handshake received the following info:" + inTopoMsg);
+
+ } catch (Throwable ex)
+ {
+ fail("Could not connect to replication server. Error in RS " + serverId +
+ " :" + ex.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Starts the fake RS, expecting and testing the passed scenario.
+ */
+ public void start(int scenario)
+ {
+
+ // Store expected test case
+ this.scenario = scenario;
+
+ if (scenario == SENDER_RS_SCENARIO)
+ {
+ // Do not start the listening thread and let the main thread receive
+ // receive acks in sendNewFakeUpdate()
+ return;
+ }
+
+ // Start listening
+ start();
+ }
+
+ /**
+ * Wait for DS connections
+ */
+ @Override
+ public void run()
+ {
+ try
+ {
+ // Loop receiving and treating updates
+ while (!shutdown)
+ {
+ try
+ {
+ ReplicationMsg replicationMsg = session.receive();
+
+ if (!(replicationMsg instanceof UpdateMsg))
+ {
+ debugInfo("Fake RS " + serverId + " received non update message: " +
+ replicationMsg);
+ continue;
+ }
+
+ UpdateMsg updateMsg = (UpdateMsg) replicationMsg;
+ checkUpdateAssuredParameters(updateMsg);
+ nReceivedUpdates++;
+
+ // Now execute the requested scenario
+ switch (scenario)
+ {
+ case REPLY_OK_RS_SCENARIO:
+ if (updateMsg.isAssured())
+ {
+ // Send the ack without errors
+ AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
+ session.publish(ackMsg);
+ ackReplied = true;
+ }
+ break;
+ case TIMEOUT_RS_SCENARIO:
+ // Let timeout occur
+ break;
+ default:
+ fail("Unknown scenario: " + scenario);
+ }
+ } catch (SocketTimeoutException toe)
+ {
+ // We may timeout reading, in this case just re-read
+ debugInfo("Fake RS " + serverId + " : " + toe.
+ getMessage() + " (this is normal)");
+ }
+ }
+ } catch (Throwable th)
+ {
+ debugInfo("Terminating thread of fake RS " + serverId + " :" + th.
+ getMessage());
+ // Probably thread closure from main thread
+ }
+ }
+
+ /**
+ * Shutdown the Replication Server service and all its connections.
+ */
+ public void shutdown()
+ {
+ if (shutdown)
+ {
+ return;
+ }
+
+ shutdown = true;
+
+ /*
+ * Shutdown any current client handling code
+ */
+ try
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ } catch (IOException e)
+ {
+ // ignore.
+ }
+
+ try
+ {
+ join();
+ } catch (InterruptedException ie)
+ {
+ }
+ }
+
+ /**
+ * Check that received update assured parameters are as defined at DS start
+ */
+ private void checkUpdateAssuredParameters(UpdateMsg updateMsg)
+ {
+ boolean ok = true;
+ if (updateMsg.isAssured() != isAssured)
+ {
+ debugInfo("Fake RS " + serverId + " received update assured flag is wrong: " + updateMsg);
+ ok = false;
+ }
+ if (updateMsg.getAssuredMode() != assuredMode)
+ {
+ debugInfo("Fake RS " + serverId + " received update assured mode is wrong: " + updateMsg);
+ ok = false;
+ }
+ if (updateMsg.getSafeDataLevel() != safeDataLevel)
+ {
+ debugInfo("Fake RS " + serverId + " received update assured sd level is wrong: " + updateMsg);
+ ok = false;
+ }
+
+ if (ok)
+ debugInfo("Fake RS " + serverId + " received update assured parameters are ok: " + updateMsg);
+ else
+ everyUpdatesAreOk = false;
+ }
+
+ public boolean receivedUpdatesOk()
+ {
+ return everyUpdatesAreOk;
+ }
+
+ public int nReceivedUpdates()
+ {
+ return nReceivedUpdates;
+ }
+
+ /**
+ * Test if the last received updates was acknowledged
+ * WARNING: this must be called once per update as it also immediatly resets the status
+ * for a new test for the next update
+ * @return True if acknowledged
+ */
+ public boolean ackReplied()
+ {
+ boolean result = ackReplied;
+ // reset ack replied status
+ ackReplied = false;
+ return result;
+ }
+ }
+
+ /**
+ * Sleep a while
+ */
+ private void sleep(long time)
+ {
+ try
+ {
+ Thread.sleep(time);
+ } catch (InterruptedException ex)
+ {
+ fail("Error sleeping " + ex);
+ }
+ }
+
+ /**
+ * See testSafeDataLevelOne comment.
+ * This is a facility to run the testSafeDataLevelOne in precommit in simplest
+ * case, so that precommit run test something and is not long.
+ * testSafeDataLevelOne will run in nightly tests (groups = "slow")
+ */
+ @Test(enabled = true)
+ public void testSafeDataLevelOnePrecommit() throws Exception
+ {
+ testSafeDataLevelOne(DEFAULT_GID, false, false, DEFAULT_GID, DEFAULT_GID);
+ }
+
+ /**
+ * Returns possible combinations of parameters for testSafeDataLevelOne test
+ */
+ @DataProvider(name = "testSafeDataLevelOneProvider")
+ private Object[][] testSafeDataLevelOneProvider()
+ {
+ return new Object[][]
+ {
+ { DEFAULT_GID, false, false, DEFAULT_GID, DEFAULT_GID},
+ { DEFAULT_GID, false, false, OTHER_GID, DEFAULT_GID},
+ { DEFAULT_GID, false, false, DEFAULT_GID, OTHER_GID},
+ { DEFAULT_GID, false, false, OTHER_GID, OTHER_GID},
+ { DEFAULT_GID, true, false, DEFAULT_GID, DEFAULT_GID},
+ { DEFAULT_GID, true, false, OTHER_GID, DEFAULT_GID},
+ { DEFAULT_GID, true, false, DEFAULT_GID, OTHER_GID},
+ { DEFAULT_GID, true, false, OTHER_GID, OTHER_GID},
+ { DEFAULT_GID, false, true, DEFAULT_GID, DEFAULT_GID},
+ { DEFAULT_GID, false, true, OTHER_GID, DEFAULT_GID},
+ { DEFAULT_GID, false, true, DEFAULT_GID, OTHER_GID},
+ { DEFAULT_GID, false, true, OTHER_GID, OTHER_GID},
+ { DEFAULT_GID, true, true, DEFAULT_GID, DEFAULT_GID},
+ { DEFAULT_GID, true, true, OTHER_GID, DEFAULT_GID},
+ { DEFAULT_GID, true, true, DEFAULT_GID, OTHER_GID},
+ { DEFAULT_GID, true, true, OTHER_GID, OTHER_GID},
+ { OTHER_GID, false, false, DEFAULT_GID, DEFAULT_GID},
+ { OTHER_GID, false, false, OTHER_GID, DEFAULT_GID},
+ { OTHER_GID, false, false, DEFAULT_GID, OTHER_GID},
+ { OTHER_GID, false, false, OTHER_GID, OTHER_GID},
+ { OTHER_GID, true, false, DEFAULT_GID, DEFAULT_GID},
+ { OTHER_GID, true, false, OTHER_GID, DEFAULT_GID},
+ { OTHER_GID, true, false, DEFAULT_GID, OTHER_GID},
+ { OTHER_GID, true, false, OTHER_GID, OTHER_GID},
+ { OTHER_GID, false, true, DEFAULT_GID, DEFAULT_GID},
+ { OTHER_GID, false, true, OTHER_GID, DEFAULT_GID},
+ { OTHER_GID, false, true, DEFAULT_GID, OTHER_GID},
+ { OTHER_GID, false, true, OTHER_GID, OTHER_GID},
+ { OTHER_GID, true, true, DEFAULT_GID, DEFAULT_GID},
+ { OTHER_GID, true, true, OTHER_GID, DEFAULT_GID},
+ { OTHER_GID, true, true, DEFAULT_GID, OTHER_GID},
+ { OTHER_GID, true, true, OTHER_GID, OTHER_GID}
+ };
+ }
+
+ /**
+ * Test that the RS is able to acknowledge SD updates sent by SD, with level 1.
+ * - 1 main fake DS connected to 1 RS, with same GID as RS or not
+ * - 1 optional other fake DS connected to RS, with same GID as RS or not
+ * - 1 optional other fake RS connected to RS, with same GID as RS or not
+ * All possible combinations tested thanks to the provider
+ */
+ @Test(dataProvider = "testSafeDataLevelOneProvider", groups = "slow", enabled = true)
+ public void testSafeDataLevelOne(int mainDsGid, boolean otherFakeDS, boolean fakeRS, int otherFakeDsGid, int fakeRsGid) throws Exception
+ {
+ String testCase = "testSafeDataLevelOne";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ try
+ {
+ /*
+ * Start real RS (the one to be tested)
+ */
+
+ // Create real RS 1
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+ testCase);
+ assertNotNull(rs1);
+
+ /*
+ * Start main DS (the one which sends updates)
+ */
+
+ // Create and connect fake domain 1 to RS 1
+ // Assured mode: SD, level 1
+ fakeRd1 = createFakeReplicationDomain(FDS1_ID, mainDsGid, RS1_ID,
+ DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT,
+ TIMEOUT_DS_SCENARIO);
+ assertNotNull(fakeRd1);
+
+ /*
+ * Start one other fake DS
+ */
+
+ // Put another fake domain connected to real RS ?
+ if (otherFakeDS)
+ {
+ // Assured set to false as RS should forward change without assured requested
+ // Timeout scenario used so that no reply is made if however the real RS
+ // by mistake sends an assured error and expects an ack from this DS:
+ // this would timeout. If main DS group id is not the same as the real RS one,
+ // the update will even not come to real RS as asured
+ fakeRd2 = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID,
+ DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT,
+ TIMEOUT_DS_SCENARIO);
+ assertNotNull(fakeRd2);
+ }
+
+ /*
+ * Start 1 fake Rs
+ */
+
+ // Put a fake RS connected to real RS ?
+ if (fakeRS)
+ {
+ // Assured set to false as RS should forward change without assured requested
+ // Timeout scenario used so that no reply is made if however the real RS
+ // by mistake sends an assured error and expects an ack from this fake RS:
+ // this would timeout. If main DS group id is not the same as the real RS one,
+ // the update will even not come to real RS as asured
+ fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRsGid, RS1_ID,
+ DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, TIMEOUT_RS_SCENARIO);
+ assertNotNull(fakeRs1);
+ }
+
+ // Send update from DS 1
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+
+ // Check call time (should have last a lot less than long timeout)
+ // (ack received if group id of DS and real RS are the same, no ack requested
+ // otherwise)
+ long sendUpdateTime = System.currentTimeMillis() - startTime;
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ if (mainDsGid == DEFAULT_GID)
+ {
+ // Check monitoring values (check that ack has been correctly received)
+ assertEquals(fakeRd1.getAssuredSdSentUpdates(), 1);
+ assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), 1);
+ assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSdServerTimeoutUpdates().size(), 0);
+ } else
+ {
+ // Check monitoring values (DS group id (OTHER_GID) is not the same as RS one
+ // (DEFAULT_GID) so update should have been sent in normal mode
+ assertEquals(fakeRd1.getAssuredSdSentUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSdServerTimeoutUpdates().size(), 0);
+ }
+
+ // Sanity check
+ sleep(1000); // Let time to update to reach other servers
+ assertEquals(fakeRd1.nReceivedUpdates(), 0);
+ assertTrue(fakeRd1.receivedUpdatesOk());
+ if (otherFakeDS)
+ {
+ assertEquals(fakeRd2.nReceivedUpdates(), 1);
+ assertTrue(fakeRd2.receivedUpdatesOk());
+ }
+ if (fakeRS)
+ {
+ assertEquals(fakeRs1.nReceivedUpdates(), 1);
+ assertTrue(fakeRs1.receivedUpdatesOk());
+ }
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ /**
+ * Returns possible combinations of parameters for testSafeDataLevelHighPrecommit test
+ */
+ @DataProvider(name = "testSafeDataLevelHighPrecommitProvider")
+ private Object[][] testSafeDataLevelHighPrecommitProvider()
+ {
+ return new Object[][]
+ {
+
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO}
+ };
+ }
+
+ /**
+ * See testSafeDataLevelHigh comment.
+ */
+ @Test(dataProvider = "testSafeDataLevelHighPrecommitProvider", enabled = true)
+ public void testSafeDataLevelHighPrecommit(int sdLevel, boolean otherFakeDS, int otherFakeDsGid, long otherFakeDsGenId,
+ int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen,
+ int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen) throws Exception
+ {
+ testSafeDataLevelHigh(sdLevel, otherFakeDS, otherFakeDsGid, otherFakeDsGenId,
+ fakeRs1Gid, fakeRs1GenId, fakeRs1Scen, fakeRs2Gid, fakeRs2GenId, fakeRs2Scen,
+ fakeRs3Gid, fakeRs3GenId, fakeRs3Scen);
+ }
+
+ /**
+ * Returns possible combinations of parameters for testSafeDataLevelHighNightly test
+ */
+ @DataProvider(name = "testSafeDataLevelHighNightlyProvider")
+ private Object[][] testSafeDataLevelHighNightlyProvider()
+ {
+ return new Object[][]
+ {
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, TIMEOUT_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, TIMEOUT_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+ { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO}
+
+ };
+ }
+
+ /**
+ * See testSafeDataLevelHigh comment.
+ */
+ @Test(dataProvider = "testSafeDataLevelHighNightlyProvider", groups = "slow", enabled = true)
+ public void testSafeDataLevelHighNightly(int sdLevel, boolean otherFakeDS, int otherFakeDsGid, long otherFakeDsGenId,
+ int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen,
+ int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen) throws Exception
+ {
+ testSafeDataLevelHigh(sdLevel, otherFakeDS, otherFakeDsGid, otherFakeDsGenId,
+ fakeRs1Gid, fakeRs1GenId, fakeRs1Scen, fakeRs2Gid, fakeRs2GenId, fakeRs2Scen,
+ fakeRs3Gid, fakeRs3GenId, fakeRs3Scen);
+ }
+
+ /**
+ * Returns possible combinations of parameters for testSafeDataLevelHigh test
+ */
+ @DataProvider(name = "testSafeDataLevelHighProvider")
+ private Object[][] testSafeDataLevelHighProvider()
+ {
+ // Constrcut all possible combinations of parameters
+ List<List<Object>> objectArrayList = new ArrayList<List<Object>>();
+
+ // Safe Data Level
+ objectArrayList = addPossibleParameters(objectArrayList, 2, 3);
+ // Other fake DS
+ objectArrayList = addPossibleParameters(objectArrayList, true, false);
+ // Other fake DS group id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+ // Other fake DS generation id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+ // Fake RS 1 group id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+ // Fake RS 1 generation id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+ // Fake RS 1 scenario
+ objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO);
+ // Fake RS 2 group id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+ // Fake RS 2 generation id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+ // Fake RS 2 scenario
+ objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO);
+ // Fake RS 3 group id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+ // Fake RS 3 generation id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+ // Fake RS 3 scenario
+ objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO);
+
+ Object[][] result = new Object[objectArrayList.size()][];
+ int i = 0;
+ for (List<Object> objectArray : objectArrayList)
+ {
+ result[i] = objectArray.toArray();
+ i++;
+ }
+
+ debugInfo("testSafeDataLevelHighProvider: number of possible parameter combinations : " + i);
+
+ return result;
+ }
+
+ // Helper for providers:
+ // Modify the passed object array list adding to each already contained object array each
+ // passed possible values.
+ // Example: to create all possible parameter combinations for a test method which has 2 parameters:
+ // one boolean then an integer, with both 2 possible values: {true|false} and {10|100}:
+ //
+ // List<List<Object>> objectArrayList = new ArrayList<List<Object>>();
+ // // Possible bolean values
+ // objectArrayList = addPossibleParameters(objectArrayList, true, false);
+ // // Possible integer values
+ // objectArrayList = addPossibleParameters(objectArrayList, 10, 100);
+ // Object[][] result = new Object[objectArrayList.size()][];
+ // int i = 0;
+ // for (List<Object> objectArray : objectArrayList)
+ // {
+ // result[i] = objectArray.toArray();
+ // i++;
+ // }
+ // return result;
+ //
+ // The provider will return the equivalent following Object[][]:
+ // new Object[][]
+ // {
+ // { true, 10},
+ // { true, 100},
+ // { false, 10},
+ // { false, 100}
+ // };
+ private List<List<Object>> addPossibleParameters(List<List<Object>> objectArrayList, Object... possibleParameters)
+ {
+ List<List<Object>> newObjectArrayList = new ArrayList<List<Object>>();
+
+ if (objectArrayList.size() == 0)
+ {
+ // First time we add some parameters, create first object arrays
+ // Add each possible parameter as initial parameter lists
+ for (Object possibleParameter : possibleParameters)
+ {
+ // Create new empty list
+ List<Object> newObjectArray = new ArrayList<Object>();
+ // Add the new possible parameter
+ newObjectArray.add(possibleParameter);
+ // Store the new object array in the result list
+ newObjectArrayList.add(newObjectArray);
+ }
+ return newObjectArrayList;
+ }
+
+ for (List<Object> objectArray : objectArrayList)
+ {
+ // Add each possible parameter to the already existing list
+ for (Object possibleParameter : possibleParameters)
+ {
+ // Clone the existing object array
+ List<Object> newObjectArray = new ArrayList<Object>();
+ for (Object object : objectArray)
+ {
+ newObjectArray.add(object);
+ }
+ // Add the new possible parameter
+ newObjectArray.add(possibleParameter);
+ // Store the new object array in the result list
+ newObjectArrayList.add(newObjectArray);
+ }
+ }
+
+ return newObjectArrayList;
+ }
+
+ /**
+ * Test that the RS is able to acknowledge SD updates with level higher than 1
+ * and also to return errors is some servers timeout.
+ * - 1 main fake DS connected to 1 RS
+ * - 1 optional other fake DS connected to RS, with same GID as RS or not and same GENID as RS or not
+ * - 3 optional other fake RSs connected to RS, with same GID as RS or not and same GENID as RS or not
+ * All possible combinations tested thanks to the provider.
+ * Fake RSs shutting down 1 after 1 to go from 3 available servers to 0. One update sent at each step.
+ *
+ * NOTE: the following unit test is disabled by default as its testSafeDataLevelHighProvider provider
+ * provides every possible combinations of parameters. This test runs then for hours. We keep this provider
+ * for occasional testing but we disable it.
+ * A simpler set of parameters is instead used in enabled test methods (which run this method in fact):
+ * - testSafeDataLevelHighPrecommit which is used for precommit and runs fast
+ * - testSafeDataLevelHighNightly which is used in nightly tests and takes more time to execute
+ */
+ @Test(dataProvider = "testSafeDataLevelHighProvider", enabled = false)
+ public void testSafeDataLevelHigh(int sdLevel, boolean otherFakeDS, int otherFakeDsGid, long otherFakeDsGenId,
+ int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen,
+ int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen) throws Exception
+ {
+ String testCase = "testSafeDataLevelHigh";
+
+ debugInfo("Starting " + testCase);
+
+ assertTrue(sdLevel > 1);
+ int nWishedServers = sdLevel - 1; // Number of fake RSs we want an ack from
+
+ initTest();
+
+ try
+ {
+ /*
+ * Start real RS (the one to be tested)
+ */
+
+ // Create real RS 1
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+ testCase);
+ assertNotNull(rs1);
+
+ /*
+ * Start main DS (the one which sends updates)
+ */
+
+ // Create and connect fake domain 1 to RS 1
+ fakeRd1 = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
+ DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
+ TIMEOUT_DS_SCENARIO);
+ assertNotNull(fakeRd1);
+
+ /*
+ * Start one other fake DS
+ */
+
+ // Put another fake domain connected to real RS ?
+ if (otherFakeDS)
+ {
+ fakeRd2 = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID,
+ otherFakeDsGenId, false, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
+ TIMEOUT_DS_SCENARIO);
+ assertNotNull(fakeRd2);
+ }
+
+ /*
+ * Start 3 fake Rss
+ */
+
+ // Put a fake RS 1 connected to real RS
+ fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRs1Gid, RS1_ID,
+ fakeRs1GenId, ((fakeRs1Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
+ fakeRs1Scen);
+ assertNotNull(fakeRs1);
+
+ // Put a fake RS 2 connected to real RS
+ fakeRs2 = createFakeReplicationServer(FRS2_ID, fakeRs2Gid, RS1_ID,
+ fakeRs2GenId, ((fakeRs2Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
+ fakeRs2Scen);
+ assertNotNull(fakeRs2);
+
+ // Put a fake RS 3 connected to real RS
+ fakeRs3 = createFakeReplicationServer(FRS3_ID, fakeRs3Gid, RS1_ID,
+ fakeRs3GenId, ((fakeRs3Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
+ fakeRs3Scen);
+ assertNotNull(fakeRs3);
+
+ // Wait for connections to be finished
+ // DS must see expected numbers of fake DSs and RSs
+ waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
+
+ /***********************************************************************
+ * Send update from DS 1 (3 fake RSs available) and check what happened
+ ***********************************************************************/
+
+ // Keep track of monitoring values for incremental test step
+ int acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
+ int timeoutUpdates = fakeRd1.getAssuredSdTimeoutUpdates();
+ Map<Short,Integer> serverErrors = fakeRd1.getAssuredSdServerTimeoutUpdates();
+ // Compute the list of servers that are elligible for receiving an assured update
+ List<Short> elligibleServers = computeElligibleServersSafeData(fakeRs1Gid, fakeRs1GenId, fakeRs2Gid, fakeRs2GenId, fakeRs3Gid, fakeRs3GenId);
+ // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+ List<Short> expectedServers = computeExpectedServersSafeData(fakeRs1Gid, fakeRs1GenId, fakeRs1Scen, fakeRs2Gid, fakeRs2GenId, fakeRs2Scen, fakeRs3Gid, fakeRs3GenId, fakeRs3Scen);
+
+ // Send update
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ long sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Check
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+ checkTimeAndMonitoringSafeData(1, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
+ checkWhatHasBeenReceived(1, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, fakeRs3GenId, expectedServers);
+
+ /***********************************************************************
+ * Send update from DS 1 (2 fake RSs available) and check what happened
+ ***********************************************************************/
+
+ // Shutdown fake RS 3
+ fakeRs3.shutdown();
+ fakeRs3 = null;
+
+ // Wait for disconnection to be finished
+ // DS must see expected numbers of fake DSs and RSs
+ waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 2);
+
+ // Keep track of monitoring values for incremental test step
+ acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
+ timeoutUpdates = fakeRd1.getAssuredSdTimeoutUpdates();
+ serverErrors = fakeRd1.getAssuredSdServerTimeoutUpdates();
+ // Compute the list of servers that are elligible for receiving an assured update
+ elligibleServers = computeElligibleServersSafeData(fakeRs1Gid, fakeRs1GenId, fakeRs2Gid, fakeRs2GenId, -1, -1L);
+ // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+ expectedServers = computeExpectedServersSafeData(fakeRs1Gid, fakeRs1GenId, fakeRs1Scen, fakeRs2Gid, fakeRs2GenId, fakeRs2Scen, -1, -1L, -1);
+
+ // Send update
+ startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Check
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+ checkTimeAndMonitoringSafeData(2, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
+ checkWhatHasBeenReceived(2, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, -1L, expectedServers);
+
+ /***********************************************************************
+ * Send update from DS 1 (1 fake RS available) and check what happened
+ ***********************************************************************/
+
+ // Shutdown fake RS 2
+ fakeRs2.shutdown();
+ fakeRs2 = null;
+
+ // Wait for disconnection to be finished
+ // DS must see expected numbers of fake DSs and RSs
+ waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 1);
+
+ // Keep track of monitoring values for incremental test step
+ acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
+ timeoutUpdates = fakeRd1.getAssuredSdTimeoutUpdates();
+ serverErrors = fakeRd1.getAssuredSdServerTimeoutUpdates();
+ // Compute the list of servers that are elligible for receiving an assured update
+ elligibleServers = computeElligibleServersSafeData(fakeRs1Gid, fakeRs1GenId, -1, -1L, -1, -1L);
+ // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+ expectedServers = computeExpectedServersSafeData(fakeRs1Gid, fakeRs1GenId, fakeRs1Scen, -1, -1L, -1, -1, -1L, -1);
+
+ // Send update
+ startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Check
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+ checkTimeAndMonitoringSafeData(3, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
+ checkWhatHasBeenReceived(3, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, -1L, -1L, expectedServers);
+
+ /***********************************************************************
+ * Send update from DS 1 (no fake RS available) and check what happened
+ ***********************************************************************/
+
+ // Shutdown fake RS 1
+ fakeRs1.shutdown();
+ fakeRs1 = null;
+
+ // Wait for disconnection to be finished
+ // DS must see expected numbers of fake DSs and RSs
+ waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 0);
+
+ // Keep track of monitoring values for incremental test step
+ acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
+ timeoutUpdates = fakeRd1.getAssuredSdTimeoutUpdates();
+ serverErrors = fakeRd1.getAssuredSdServerTimeoutUpdates();
+ // Compute the list of servers that are elligible for receiving an assured update
+ elligibleServers = computeElligibleServersSafeData(-1, -1L, -1, -1L, -1, -1L);
+ // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+ expectedServers = computeExpectedServersSafeData(-1, -1L, -1, -1, -1L, -1, -1, -1L, -1);
+
+ // Send update
+ startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Check
+ sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+ checkTimeAndMonitoringSafeData(4, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
+ checkWhatHasBeenReceived(4, otherFakeDS, otherFakeDsGenId, -1L, -1L, -1L, expectedServers);
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ // Check that the DSs and the fake RSs of the topology have received/acked what is expected according to the
+ // test step (the number of updates)
+ // -1 for a gen id means no need to test the matching fake RS
+ private void checkWhatHasBeenReceived(int nSentUpdates, boolean otherFakeDS, long otherFakeDsGenId, long fakeRs1GenId, long fakeRs2GenId, long fakeRs3GenId, List<Short> expectedServers)
+ {
+
+ // We should not receive our own update
+ assertEquals(fakeRd1.nReceivedUpdates(), 0);
+ assertTrue(fakeRd1.receivedUpdatesOk());
+
+ // Check what received other fake DS
+ if (otherFakeDS)
+ {
+ if (otherFakeDsGenId == DEFAULT_GENID)
+ {
+ // Update should have been received
+ assertEquals(fakeRd2.nReceivedUpdates(), nSentUpdates);
+ assertTrue(fakeRd2.receivedUpdatesOk());
+ } else
+ {
+ assertEquals(fakeRd2.nReceivedUpdates(), 0);
+ assertTrue(fakeRd2.receivedUpdatesOk());
+ }
+ }
+
+ // Check what received/did fake Rss
+
+ if (nSentUpdates < 4) // Fake RS 3 is stopped after 3 updates sent
+ {
+ if (fakeRs1GenId != DEFAULT_GENID)
+ assertEquals(fakeRs1.nReceivedUpdates(), 0);
+ else
+ assertEquals(fakeRs1.nReceivedUpdates(), nSentUpdates);
+ assertTrue(fakeRs1.receivedUpdatesOk());
+ if (expectedServers.contains(FRS1_ID))
+ assertTrue(fakeRs1.ackReplied());
+ else
+ assertFalse(fakeRs1.ackReplied());
+ }
+
+ if (nSentUpdates < 3) // Fake RS 3 is stopped after 2 updates sent
+ {
+ if (fakeRs2GenId != DEFAULT_GENID)
+ assertEquals(fakeRs2.nReceivedUpdates(), 0);
+ else
+ assertEquals(fakeRs2.nReceivedUpdates(), nSentUpdates);
+ assertTrue(fakeRs2.receivedUpdatesOk());
+ if (expectedServers.contains(FRS2_ID))
+ assertTrue(fakeRs2.ackReplied());
+ else
+ assertFalse(fakeRs2.ackReplied());
+ }
+
+ if (nSentUpdates < 2) // Fake RS 3 is stopped after 1 update sent
+ {
+ if (fakeRs3GenId != DEFAULT_GENID)
+ assertEquals(fakeRs3.nReceivedUpdates(), 0);
+ else
+ assertEquals(fakeRs3.nReceivedUpdates(), nSentUpdates);
+ assertTrue(fakeRs3.receivedUpdatesOk());
+ if (expectedServers.contains(FRS3_ID))
+ assertTrue(fakeRs3.ackReplied());
+ else
+ assertFalse(fakeRs3.ackReplied());
+ }
+ }
+
+ /**
+ * Check the time the sending of the safe data assured update took and the monitoring
+ * values according to the test configuration
+ */
+ private void checkTimeAndMonitoringSafeData(int nSentUpdates, int prevNAckUpdates, int prevNTimeoutUpdates, Map<Short,Integer> prevNServerErrors, long sendUpdateTime,
+ int nWishedServers, List<Short> elligibleServers, List<Short> expectedServers)
+ {
+ assertEquals(fakeRd1.getAssuredSdSentUpdates(), nSentUpdates);
+ if (elligibleServers.size() >= nWishedServers) // Enough elligible servers
+ {
+ if (expectedServers.size() >= nWishedServers) // Enough servers should ack
+ {
+ // Enough server ok for acking: ack should come back quickly
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+ // Check monitoring values (check that ack has been correctly received)
+ assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates + 1);
+ assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates);
+ checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, null); // Should have same value as previous one
+ } else
+ {
+ // Not enough expected servers: should have timed out in RS timeout
+ // (SMALL_TIMEOUT)
+ assertTrue((SMALL_TIMEOUT <= sendUpdateTime) && (sendUpdateTime <=
+ LONG_TIMEOUT));
+ // Check monitoring values (check that timeout occured)
+ assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates);
+ assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates + 1);
+ // Check that the servers that are elligible but not expected have been added in the error by server list
+ List<Short> expectedServersInError = computeExpectedServersInError(elligibleServers, expectedServers);
+ checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, expectedServersInError);
+ }
+ } else // Not enough elligible servers
+ {
+ if (elligibleServers.size() > 0) // Some elligible servers anyway
+ {
+ if (expectedServers.size() == elligibleServers.size()) // All elligible servers should respond in time
+ {
+ // Enough server ok for acking: ack should come back quickly
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+ // Check monitoring values (check that ack has been correctly received)
+ assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates + 1);
+ assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates);
+ checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, null); // Should have same value as previous one
+ } else
+ { // Some elligible servers should fail
+ // Not enough expected servers: should have timed out in RS timeout
+ // (SMALL_TIMEOUT)
+ assertTrue((SMALL_TIMEOUT <= sendUpdateTime) && (sendUpdateTime <=
+ LONG_TIMEOUT));
+ // Check monitoring values (check that timeout occured)
+ assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates);
+ assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates + 1);
+ // Check that the servers that are elligible but not expected have been added in the error by server list
+ List<Short> expectedServersInError = computeExpectedServersInError(elligibleServers, expectedServers);
+ checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, expectedServersInError);
+ }
+ } else
+ {
+ // No elligible servers at all, RS should not wait for any ack and immediately ack the update
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+ // Check monitoring values (check that ack has been correctly received)
+ assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates + 1);
+ assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates);
+ checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, null); // Should have same value as previous one
+ }
+ }
+ }
+
+ // Compute a list of servers that are elligibles but that are not able to return an ack
+ // (those in elligibleServers that are not in expectedServers). Result may of course be an empty list
+ private List<Short> computeExpectedServersInError(List<Short> elligibleServers, List<Short> expectedServers)
+ {
+ List<Short> expectedServersInError = new ArrayList<Short>();
+ for (Short serverId : elligibleServers)
+ {
+ if (!expectedServers.contains(serverId))
+ expectedServersInError.add(serverId);
+ }
+ return expectedServersInError;
+ }
+
+ // Check that the passed list of errors by server ids is as expected.
+ // - if expectedServersInError is not null and not empty, each server id in measuredServerErrors should have the value it has
+ // in prevServerErrors + 1, or 1 if it was not in prevServerErrors
+ // - if expectedServersInError is null or empty, both map should be equal
+ private void checkServerErrors(Map<Short,Integer> measuredServerErrors, Map<Short,Integer> prevServerErrors, List<Short> expectedServersInError)
+ {
+ if (expectedServersInError != null)
+ {
+ // Adding an error to each server in expectedServersInError, with prevServerErrors as basis, should give the
+ // same map as measuredServerErrors
+ for (Short serverId : expectedServersInError)
+ {
+ Integer prevInt = prevServerErrors.get(serverId);
+ if (prevInt == null)
+ {
+ // Add this server to the list of servers in error
+ prevServerErrors.put(serverId, 1);
+ } else
+ {
+ // Already errors for this server, increment the value
+ int newVal = prevInt.intValue() + 1;
+ prevServerErrors.put(serverId, newVal);
+ }
+ }
+ }
+
+ // Maps should be the same
+ assertEquals(measuredServerErrors.size(), prevServerErrors.size());
+ Set<Short> measuredKeySet = measuredServerErrors.keySet();
+ for (Short serverId : measuredKeySet)
+ {
+ Integer measuredInt = measuredServerErrors.get(serverId);
+ assertNotNull(measuredInt);
+ assertTrue(measuredInt.intValue() != 0);
+ Integer prevInt = prevServerErrors.get(serverId);
+ assertNotNull(prevInt);
+ assertTrue(prevInt.intValue() != 0);
+ assertEquals(measuredInt, prevInt);
+ }
+ }
+
+ /**
+ * Wait until number of fake DSs and fake RSs are available in the topo view of the passed
+ * fake DS or throw an assertion if timeout waiting.
+ */
+ private void waitForStableTopo(FakeReplicationDomain fakeRd, int expectedDs, int expectedRs)
+ {
+ int nSec = 30;
+ int nDs = 0;
+ int nRs = 0;
+ List<DSInfo> dsInfo = null;
+ List<RSInfo> rsInfo = null;
+ while(nSec > 0)
+ {
+ dsInfo = fakeRd.getDsList();
+ rsInfo = fakeRd.getRsList();
+ nDs = dsInfo.size();
+ nRs = rsInfo.size();
+ if ( (nDs == expectedDs) && (nRs == (expectedRs+1)) ) // Must include real RS so '+1'
+ {
+ debugInfo("waitForStableTopo: expected topo obtained after " + (30-nSec) + " second(s).");
+ return;
+ }
+ sleep(1000);
+ nSec--;
+ }
+ fail("Did not reach expected topo view in time: expected " + expectedDs +
+ " DSs (had " + dsInfo +") and " + expectedRs + " RSs (had " + rsInfo +").");
+ }
+
+ // Compute the list of servers that are elligible for receiving an assured update
+ // according to their group id and generation id. If -1 is used, the server is out of scope
+ private List<Short> computeElligibleServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs2Gid, long fakeRs2GenId, int fakeRs3Gid, long fakeRs3GenId)
+ {
+ List<Short> elligibleServers = new ArrayList<Short>();
+ if (areGroupAndGenerationIdOk(fakeRs1Gid, fakeRs1GenId))
+ {
+ elligibleServers.add(FRS1_ID);
+ }
+ if (areGroupAndGenerationIdOk(fakeRs2Gid, fakeRs2GenId))
+ {
+ elligibleServers.add(FRS2_ID);
+ }
+ if (areGroupAndGenerationIdOk(fakeRs3Gid, fakeRs3GenId))
+ {
+ elligibleServers.add(FRS3_ID);
+ }
+ return elligibleServers;
+ }
+
+ // Are group id and generation id ok for being an elligible RS for assured update ?
+ private boolean areGroupAndGenerationIdOk(int fakeRsGid, long fakeRsGenId)
+ {
+ if ((fakeRsGid != -1) && (fakeRsGenId != -1L))
+ {
+ return ( (fakeRsGid == DEFAULT_GID) && (fakeRsGenId == DEFAULT_GENID) );
+ }
+ return false;
+ }
+
+ // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+ // If -1 is used, the server is out of scope
+ private List<Short> computeExpectedServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen, int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
+ {
+ List<Short> exptectedServers = new ArrayList<Short>();
+ if (areGroupAndGenerationIdOk(fakeRs1Gid, fakeRs1GenId))
+ {
+ if (fakeRs1Scen == REPLY_OK_RS_SCENARIO)
+ {
+ exptectedServers.add(FRS1_ID);
+ } else if (fakeRs1Scen != TIMEOUT_RS_SCENARIO)
+ {
+ fail("No other scenario should be used here");
+ return null;
+ }
+ }
+ if (areGroupAndGenerationIdOk(fakeRs2Gid, fakeRs2GenId))
+ {
+ if (fakeRs2Scen == REPLY_OK_RS_SCENARIO)
+ {
+ exptectedServers.add(FRS2_ID);
+ } else if (fakeRs2Scen != TIMEOUT_RS_SCENARIO)
+ {
+ fail("No other scenario should be used here");
+ return null;
+ }
+ }
+ if (areGroupAndGenerationIdOk(fakeRs3Gid, fakeRs3GenId))
+ {
+ if (fakeRs3Scen == REPLY_OK_RS_SCENARIO)
+ {
+ exptectedServers.add(FRS3_ID);
+ } else if (fakeRs3Scen != TIMEOUT_RS_SCENARIO)
+ {
+ fail("No other scenario should be used here");
+ return null;
+ }
+ }
+ return exptectedServers;
+ }
+
+ /**
+ * Returns possible combinations of parameters for testSafeDataFromRS test
+ */
+ @DataProvider(name = "testSafeDataFromRSProvider")
+ private Object[][] testSafeDataFromRSProvider()
+ {
+ List<List<Object>> objectArrayList = new ArrayList<List<Object>>();
+
+ // Safe Data Level
+ objectArrayList = addPossibleParameters(objectArrayList, 1, 2, 3);
+ // Fake RS group id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+ // Fake RS generation id
+ objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+ // Fake RS sends update in assured mode
+ objectArrayList = addPossibleParameters(objectArrayList, true, false);
+
+ Object[][] result = new Object[objectArrayList.size()][];
+ int i = 0;
+ for (List<Object> objectArray : objectArrayList)
+ {
+ result[i] = objectArray.toArray();
+ i++;
+ }
+ return result;
+ }
+
+ /**
+ * Test that the RS is acking or not acking a safe data update sent from another
+ * (fake) RS according to passed parameters
+ */
+ @Test(dataProvider = "testSafeDataFromRSProvider", groups = "slow", enabled = true)
+ public void testSafeDataFromRS(int sdLevel, int fakeRsGid, long fakeRsGenId, boolean sendInAssured) throws Exception
+ {
+ String testCase = "testSafeDataFromRS";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ try
+ {
+ /*
+ * Start real RS (the one to be tested)
+ */
+
+ // Create real RS 1
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+ testCase);
+ assertNotNull(rs1);
+
+ /*
+ * Start fake RS to make the RS have the default generation id
+ */
+
+ // Put a fake RS 2 connected to real RS
+ fakeRs2 = createFakeReplicationServer(FRS2_ID, DEFAULT_GID, RS1_ID,
+ DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 10,
+ TIMEOUT_RS_SCENARIO);
+ assertNotNull(fakeRs2);
+
+ /*
+ * Start fake RS to send updates
+ */
+
+ // Put a fake RS 1 connected to real RS
+ fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRsGid, RS1_ID,
+ fakeRsGenId, sendInAssured, AssuredMode.SAFE_DATA_MODE, sdLevel,
+ SENDER_RS_SCENARIO);
+ assertNotNull(fakeRs1);
+
+ /*
+ * Send an assured update using configured assured parameters
+ */
+
+ long startTime = System.currentTimeMillis();
+ AckMsg ackMsg = null;
+ boolean timeout = false;
+ try
+ {
+ ackMsg = fakeRs1.sendNewFakeUpdate();
+ } catch (SocketTimeoutException e)
+ {
+ debugInfo("testSafeDataFromRS: timeout waiting for update ack");
+ timeout = true;
+ }
+ long sendUpdateTime = System.currentTimeMillis() - startTime;
+ debugInfo("testSafeDataFromRS: send update call time: " + sendUpdateTime);
+
+ /*
+ * Now check timeout or not according to test configuration parameters
+ */
+ if ( (sdLevel == 1) || (fakeRsGid != DEFAULT_GID) ||
+ (fakeRsGenId != DEFAULT_GENID) || (!sendInAssured) )
+ {
+ // Should have timed out (no ack)
+ assertTrue(timeout);
+ assertNull(ackMsg);
+ } else
+ {
+ // Ack should have been received
+ assertFalse(timeout);
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+ assertNotNull(ackMsg);
+ assertFalse(ackMsg.hasTimeout());
+ assertFalse(ackMsg.hasReplayError());
+ assertFalse(ackMsg.hasWrongStatus());
+ assertEquals(ackMsg.getFailedServers().size(), 0);
+ }
+
+ } finally
+ {
+ endTest();
+ }
+ }
+
+ /**
+ * Returns possible combinations of parameters for testSafeDataManyRealRSs test
+ */
+ @DataProvider(name = "testSafeDataManyRealRSsProvider")
+ private Object[][] testSafeDataManyRealRSsProvider()
+ {
+ return new Object[][]
+ {
+ {1},
+ {2},
+ {3},
+ {4}
+ };
+ }
+
+ /**
+ * Test topo of 3 real RSs.
+ * One assured safe data update sent with different safe data level.
+ * Update should always be acked
+ */
+ @Test(dataProvider = "testSafeDataManyRealRSsProvider", enabled = true)
+ public void testSafeDataManyRealRSs(int sdLevel) throws Exception
+ {
+ String testCase = "testSafeDataManyRealRSs";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ try
+ {
+
+ /*
+ * Start 3 real RSs
+ */
+
+ // Create real RS 1
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+ testCase);
+ assertNotNull(rs1);
+
+ // Create real RS 2
+ rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
+ testCase);
+ assertNotNull(rs2);
+
+ // Create real RS 3
+ rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
+ testCase);
+ assertNotNull(rs3);
+
+ /*
+ * Start DS that will send updates
+ */
+
+ // Wait for RSs to connect together
+ // Create and connect fake domain 1 to RS 1
+ fakeRd1 = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
+ DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
+ TIMEOUT_DS_SCENARIO);
+ assertNotNull(fakeRd1);
+
+ // Wait for RSs connections to be finished
+ // DS must see expected numbers of RSs
+ waitForStableTopo(fakeRd1, 0, 2);
+
+ /*
+ * Send update from DS 1 and check result
+ */
+
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ fakeRd1.sendNewFakeUpdate();
+ } catch (TimeoutException e)
+ {
+ fail("No timeout is expected here");
+ }
+ long sendUpdateTime = System.currentTimeMillis() - startTime;
+
+ // Check call time
+ assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+ // Check monitoring values (check that ack has been correctly received)
+ assertEquals(fakeRd1.getAssuredSdSentUpdates(), 1);
+ assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), 1);
+ assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), 0);
+ assertEquals(fakeRd1.getAssuredSdServerTimeoutUpdates().size(), 0);
+ } finally
+ {
+ endTest();
+ }
+ }
+}
+
--
Gitblit v1.10.0