From 0107a1af4a4325c502fe2632c7d0f4dcf3da98ba Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Tue, 16 Dec 2008 17:03:47 +0000
Subject: [PATCH] Assured Replication: - all unit tests for safe data mode - assured replication code corrections (thanks to safe data unit tests) => Still every unit tests for safe read mode to do...
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 172 +++++++++++++++++++++++++++++++++++---------------------
1 files changed, 107 insertions(+), 65 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index cf68944..9408c24 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -276,10 +276,10 @@
// According to assured sub-mode, prepare structures to keep track of
// the acks we are interested in.
AssuredMode assuredMode = update.getAssuredMode();
- if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+ if (assuredMode == AssuredMode.SAFE_DATA_MODE)
{
preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
- } else if (assuredMode != AssuredMode.SAFE_READ_MODE)
+ } else if (assuredMode == AssuredMode.SAFE_READ_MODE)
{
preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
} else
@@ -525,7 +525,15 @@
for (ServerHandler handler : replicationServers.values())
{
if (handler.getGroupId() == groupId)
- expectedServers.add(handler.getServerId());
+ // No ack expected from a RS with different group id
+ {
+ if ((generationId > 0) &&
+ (generationId == handler.getGenerationId()))
+ // No ack expected from a RS with bad gen id
+ {
+ expectedServers.add(handler.getServerId());
+ }
+ }
}
}
@@ -538,13 +546,29 @@
continue;
}
if (handler.getGroupId() == groupId)
+ // No ack expected from a DS with different group id
{
- if (handler.getStatus() == ServerStatus.NORMAL_STATUS)
+ ServerStatus serverStatus = handler.getStatus();
+ if (serverStatus == ServerStatus.NORMAL_STATUS)
{
expectedServers.add(handler.getServerId());
} else
+ // No ack expected from a DS with wrong status
{
- wrongStatusServers.add(handler.getServerId());
+ if (serverStatus == ServerStatus.DEGRADED_STATUS)
+ {
+ wrongStatusServers.add(handler.getServerId());
+ } else
+ {
+ /*
+ * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
+ * We do not want this to be reported as an error to the update
+ * maker -> no pollution or potential missunderstanding when
+ * reading logs or monitoring and it was just administration (for
+ * instance new server is being configured in topo: it goes in bad
+ * gen then then full full update).
+ */
+ }
}
}
}
@@ -589,7 +613,7 @@
UpdateMsg update, ServerHandler sourceHandler) throws IOException
{
ChangeNumber cn = update.getChangeNumber();
- boolean interestedInAcks = true;
+ boolean interestedInAcks = false;
byte safeDataLevel = update.getSafeDataLevel();
byte groupId = replicationServer.getGroupId();
byte sourceGroupId = sourceHandler.getGroupId();
@@ -600,47 +624,55 @@
Short.toString(replicationServer.getServerId()),
Byte.toString(safeDataLevel), baseDn, update.toString());
logError(errorMsg);
- interestedInAcks = false;
} else if (sourceGroupId != groupId)
{
// Assured feature does not cross different group ids
- interestedInAcks = false;
} else
{
- if (sourceHandler.isLDAPserver())
+ if ((generationId > 0) &&
+ (generationId == sourceHandler.getGenerationId()))
+ // Ignore assured updates from wrong generationid servers
{
- if (safeDataLevel == (byte) 1)
+ if (sourceHandler.isLDAPserver())
{
- // Immediatly return the ack for an assured message in safe data mode
- // with safe data level 1, coming from a DS. No need to wait for more
- // acks
- AckMsg ack = new AckMsg(cn);
- sourceHandler.sendAck(ack);
- interestedInAcks = false; // No further acks to obtain
+ if (safeDataLevel == (byte) 1)
+ {
+ // Immediatly return the ack for an assured message in safe data
+ // mode with safe data level 1, coming from a DS. No need to wait
+ // for more acks
+ AckMsg ack = new AckMsg(cn);
+ sourceHandler.sendAck(ack);
+ } else
+ {
+ if (safeDataLevel != (byte) 0)
+ {
+ // level > 1 : We need further acks
+ // The message will be posted in assured mode to elligible
+ // servers. The embedded safe data level is not changed, and his
+ // value will be used by a remote RS to determine if he must send
+ // an ack (level > 1) or not (level = 1)
+ interestedInAcks = true;
+ } else
+ {
+ // Should never happen
+ }
+ }
} else
- {
- // level > 1 : We need further acks
- // The message will be posted in assured mode to elligible servers.
- // The embedded safe data level is not changed, and his value will be
- // used by a remote RS to determine if he must send an ack (level > 1)
- // or not (level = 1)
- }
- } else
- { // A RS sent us the safe data message, for sure no futher acks to wait
- interestedInAcks = false;
- if (safeDataLevel == (byte) 1)
- {
- // The original level was 1 so the RS that sent us this message should
- // have already sent his ack to the sender DS. Level 1 has already
- // been reached so no further acks to wait
- // This should not happen in theory as the sender RS server should
- // have sent us a matching not assured message so we should not come
- // to here.
- } else
- {
- // level > 1, so Ack this message to originator RS
- AckMsg ack = new AckMsg(cn);
- sourceHandler.sendAck(ack);
+ { // A RS sent us the safe data message, for sure no futher acks to wait
+ if (safeDataLevel == (byte) 1)
+ {
+ // The original level was 1 so the RS that sent us this message
+ // should have already sent his ack to the sender DS. Level 1 has
+ // already been reached so no further acks to wait.
+ // This should not happen in theory as the sender RS server should
+ // have sent us a matching not assured message so we should not come
+ // to here.
+ } else
+ {
+ // level > 1, so Ack this message to originator RS
+ AckMsg ack = new AckMsg(cn);
+ sourceHandler.sendAck(ack);
+ }
}
}
}
@@ -654,39 +686,46 @@
for (ServerHandler handler : replicationServers.values())
{
if (handler.getGroupId() == groupId)
- expectedServers.add(handler.getServerId());
+ // No ack expected from a RS with different group id
+ {
+ if ((generationId > 0) &&
+ (generationId == handler.getGenerationId()))
+ // No ack expected from a RS with bad gen id
+ {
+ expectedServers.add(handler.getServerId());
+ }
+ }
}
}
-
- // Look for DS elligible for assured
- for (ServerHandler handler : directoryServers.values())
- {
- // Don't forward the change to the server that just sent it
- if (handler == sourceHandler)
- {
- continue;
- }
- if (handler.getGroupId() == groupId)
- expectedServers.add(handler.getServerId());
- }
}
// Return computed structures
PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
- if (interestedInAcks && (expectedServers.size() > 0))
+ int nExpectedServers = expectedServers.size();
+ if (interestedInAcks) // interestedInAcks so level > 1
{
- // Some other acks to wait for
- preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
- sourceHandler, update.getSafeDataLevel());
- preparedAssuredInfo.expectedServers = expectedServers;
- }
-
- if (interestedInAcks && (preparedAssuredInfo.expectedServers == null))
- {
- // level > 1 and source is a DS but no elligible servers found, send the
- // ack immediatly
- AckMsg ack = new AckMsg(cn);
- sourceHandler.sendAck(ack);
+ if (nExpectedServers > 0)
+ {
+ // Some other acks to wait for
+ int sdl = update.getSafeDataLevel();
+ int neededAdditionalServers = sdl - 1;
+ // Change the number of expected acks if not enough available elligible
+ // servers: the level is a best effort thing, we do not want to timeout
+ // at every assured SD update for instance if a RS has had his gen id
+ // resetted
+ byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ?
+ (byte)sdl : // Keep level as it was
+ (byte)(nExpectedServers+1)); // Change level to match what's available
+ preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
+ sourceHandler, finalSdl, expectedServers);
+ preparedAssuredInfo.expectedServers = expectedServers;
+ } else
+ {
+ // level > 1 and source is a DS but no elligible servers found, send the
+ // ack immediatly
+ AckMsg ack = new AckMsg(cn);
+ sourceHandler.sendAck(ack);
+ }
}
return preparedAssuredInfo;
@@ -1436,6 +1475,9 @@
*/
public void shutdown()
{
+ // Terminate the assured timer
+ assuredTimeoutTimer.cancel();
+
// Close session with other changelogs
for (ServerHandler serverHandler : replicationServers.values())
{
--
Gitblit v1.10.0