From 0107a1af4a4325c502fe2632c7d0f4dcf3da98ba Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Tue, 16 Dec 2008 17:03:47 +0000
Subject: [PATCH] Assured Replication: - all unit tests for safe data mode - assured replication code corrections (thanks to safe data unit tests) => Still every unit tests for safe read mode to do...

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java |   47 +++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 41 insertions(+), 6 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
index e4ce2c4..da6a716 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
@@ -27,6 +27,9 @@
 
 package org.opends.server.replication.server;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
 import org.opends.server.loggers.debug.DebugTracer;
@@ -61,11 +64,14 @@
    * message
    * @param safeDataLevel The Safe Data level requested for the assured
    * update message
+   * @param expectedServers The list of servers we want an ack from
    */
   public SafeDataExpectedAcksInfo(ChangeNumber changeNumber,
-    ServerHandler requesterServerHandler, byte safeDataLevel)
+    ServerHandler requesterServerHandler, byte safeDataLevel,
+    List<Short> expectedServers)
   {
-    super(changeNumber, requesterServerHandler, AssuredMode.SAFE_DATA_MODE);
+    super(changeNumber, requesterServerHandler, AssuredMode.SAFE_DATA_MODE,
+      expectedServers);
     this.safeDataLevel = safeDataLevel;
   }
 
@@ -88,11 +94,27 @@
         return false;
      }
 
-    numReceivedAcks++;
-    if (numReceivedAcks == safeDataLevel)
-      return true;
-    else
+    // Get the ack status for the matching server
+    short ackingServerId = ackingServer.getServerId();
+    boolean ackReceived = expectedServersAckStatus.get(ackingServerId);
+    if (ackReceived)
+    {
+      // Sanity check: this should never happen
+      if (debugEnabled())
+        TRACER.debugInfo("Received unexpected ack from server id: " +
+          ackingServerId + " ack message: " + ackMsg);
       return false;
+    } else
+    {
+
+      // Mark this ack received for the server
+      expectedServersAckStatus.put(ackingServerId, true);
+      numReceivedAcks++;
+      if (numReceivedAcks == safeDataLevel)
+        return true;
+      else
+        return false;
+    }
   }
 
   /**
@@ -103,7 +125,20 @@
     AckMsg ack = new AckMsg(changeNumber);
 
     if (timeout)
+    {
+      // Fill collected errors info
       ack.setHasTimeout(true);
+      // Tell wich servers did not send an ack in time
+      List<Short> failedServers = new ArrayList<Short>();
+      Set<Short> serverIds = expectedServersAckStatus.keySet();
+      for (Short serverId : serverIds)
+      {
+        boolean ackReceived = expectedServersAckStatus.get(serverId);
+        if (!ackReceived)
+          failedServers.add(serverId);
+      }
+      ack.setFailedServers(failedServers);
+    }
 
     return ack;
   }

--
Gitblit v1.10.0