From 0107a1af4a4325c502fe2632c7d0f4dcf3da98ba Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Tue, 16 Dec 2008 17:03:47 +0000
Subject: [PATCH] Assured Replication: - all unit tests for safe data mode - assured replication code corrections (thanks to safe data unit tests) => Still every unit tests for safe read mode to do...
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java | 47 +++++++++++++++++++++++++++++++++++++++++------
1 files changed, 41 insertions(+), 6 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
index e4ce2c4..da6a716 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
@@ -27,6 +27,9 @@
package org.opends.server.replication.server;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
@@ -61,11 +64,14 @@
* message
* @param safeDataLevel The Safe Data level requested for the assured
* update message
+ * @param expectedServers The list of servers we want an ack from
*/
public SafeDataExpectedAcksInfo(ChangeNumber changeNumber,
- ServerHandler requesterServerHandler, byte safeDataLevel)
+ ServerHandler requesterServerHandler, byte safeDataLevel,
+ List<Short> expectedServers)
{
- super(changeNumber, requesterServerHandler, AssuredMode.SAFE_DATA_MODE);
+ super(changeNumber, requesterServerHandler, AssuredMode.SAFE_DATA_MODE,
+ expectedServers);
this.safeDataLevel = safeDataLevel;
}
@@ -88,11 +94,27 @@
return false;
}
- numReceivedAcks++;
- if (numReceivedAcks == safeDataLevel)
- return true;
- else
+ // Get the ack status for the matching server
+ short ackingServerId = ackingServer.getServerId();
+ boolean ackReceived = expectedServersAckStatus.get(ackingServerId);
+ if (ackReceived)
+ {
+ // Sanity check: this should never happen
+ if (debugEnabled())
+ TRACER.debugInfo("Received unexpected ack from server id: " +
+ ackingServerId + " ack message: " + ackMsg);
return false;
+ } else
+ {
+
+ // Mark this ack received for the server
+ expectedServersAckStatus.put(ackingServerId, true);
+ numReceivedAcks++;
+ if (numReceivedAcks == safeDataLevel)
+ return true;
+ else
+ return false;
+ }
}
/**
@@ -103,7 +125,20 @@
AckMsg ack = new AckMsg(changeNumber);
if (timeout)
+ {
+ // Fill collected errors info
ack.setHasTimeout(true);
+ // Tell wich servers did not send an ack in time
+ List<Short> failedServers = new ArrayList<Short>();
+ Set<Short> serverIds = expectedServersAckStatus.keySet();
+ for (Short serverId : serverIds)
+ {
+ boolean ackReceived = expectedServersAckStatus.get(serverId);
+ if (!ackReceived)
+ failedServers.add(serverId);
+ }
+ ack.setFailedServers(failedServers);
+ }
return ack;
}
--
Gitblit v1.10.0