From a719d21181a3b1c98c16bc677e892cf67fed4e7f Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 18 Dec 2008 17:13:46 +0000
Subject: [PATCH] Assured Replication: - support for dynamic reconfiguration (domain and replication server) - performance improvement in domain (less lock time between sending threads) - performance improvement in server (safe data ack before DB push) - more monitoring info for safe read mode Misc: - support for dynamic domain group id reconfiguration
---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 356 ++++++++++++++++++++++++++++++++++++++++-------------------
1 files changed, 242 insertions(+), 114 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index fa95572..7c4df2b 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -215,7 +215,7 @@
// Safe Data level (used when assuredMode is SAFE_DATA)
private byte assuredSdLevel = (byte)1;
// The timeout in ms that should be used, when waiting for assured acks
- private long assuredTimeout = 1000;
+ private long assuredTimeout = 2000;
// Group id
private byte groupId = (byte)1;
@@ -257,6 +257,14 @@
// format: <server id>:<number of failed updates>
private Map<Short,Integer> assuredSrServerNotAcknowledgedUpdates =
new HashMap<Short,Integer>();
+ // Number of updates received in Assured Mode, Safe Read request
+ private AtomicInteger receivedAssuredSrUpdates = new AtomicInteger(0);
+ // Number of updates received in Assured Mode, Safe Read request that we have
+ // acked without errors
+ private AtomicInteger receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
+ // Number of updates received in Assured Mode, Safe Read request that we have
+ // acked with errors
+ private AtomicInteger receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
// Number of updates sent in Assured Mode, Safe Data
private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
// Number of updates sent in Assured Mode, Safe Data, that have been
@@ -765,86 +773,12 @@
}
numRcvdUpdates.incrementAndGet();
- return update;
- }
-
- /**
- * Wait for the processing of an assured message.
- *
- * @param msg The UpdateMsg for which we are waiting for an ack.
- * @throws TimeoutException When the configured timeout occurs waiting for the
- * ack.
- */
- private void waitForAck(UpdateMsg msg) throws TimeoutException
- {
- // Wait for the ack to be received, timing out if necessary
- long startTime = System.currentTimeMillis();
- synchronized (msg)
+ if (update.isAssured() && (update.getAssuredMode() ==
+ AssuredMode.SAFE_READ_MODE))
{
- ChangeNumber cn = msg.getChangeNumber();
- while (waitingAckMsgs.containsKey(cn))
- {
- try
- {
- // WARNING: this timeout may be difficult to optimize: too low, it
- // may use too much CPU, too high, it may penalize performance...
- msg.wait(10);
- } catch (InterruptedException e)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("waitForAck method interrupted for replication " +
- "serviceID: " + serviceID);
- }
- break;
- }
- // Timeout ?
- if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
- {
- // Timeout occured, be sure that ack is not being received and if so,
- // remove the update from the wait list, log the timeout error and
- // also update assured monitoring counters
- UpdateMsg update;
- synchronized (waitingAckMsgs)
- {
- update = waitingAckMsgs.remove(cn);
- }
-
- if (update != null)
- {
- // No luck, this is a real timeout
- // Increment assured replication monitoring counters
- switch (msg.getAssuredMode())
- {
- case SAFE_READ_MODE:
- assuredSrNotAcknowledgedUpdates.incrementAndGet();
- assuredSrTimeoutUpdates.incrementAndGet();
- // Increment number of errors for our RS
- updateAssuredErrorsByServer(
- assuredSrServerNotAcknowledgedUpdates,
- broker.getRsServerId());
- break;
- case SAFE_DATA_MODE:
- assuredSdTimeoutUpdates.incrementAndGet();
- // Increment number of errors for our RS
- updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
- broker.getRsServerId());
- break;
- default:
- // Should not happen
- }
-
- throw new TimeoutException("No ack received for message cn: " + cn +
- " and replication servceID: " + serviceID + " after " +
- assuredTimeout + " ms.");
- } else
- {
- // Ack received just before timeout limit: we can exit
- break;
- }
- }
- }
+ receivedAssuredSrUpdates.incrementAndGet();
}
+ return update;
}
/**
@@ -2002,7 +1936,50 @@
*/
public Map<Short, Integer> getAssuredSrServerNotAcknowledgedUpdates()
{
- return assuredSrServerNotAcknowledgedUpdates;
+ // Clone a snapshot with synchronized section to have a consistent view in
+ // monitoring
+ Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
+ synchronized(assuredSrServerNotAcknowledgedUpdates)
+ {
+ Set<Short> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
+ for (Short serverId : keySet)
+ {
+ Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
+ snapshot.put(serverId, i);
+ }
+ }
+ return snapshot;
+ }
+
+ /**
+ * Gets the number of updates received in assured safe read mode request.
+ * @return The number of updates received in assured safe read mode request.
+ */
+ public int getReceivedAssuredSrUpdates()
+ {
+ return receivedAssuredSrUpdates.get();
+ }
+
+ /**
+ * Gets the number of updates received in assured safe read mode that we acked
+ * without error (no replay error).
+ * @return The number of updates received in assured safe read mode that we
+ * acked without error (no replay error).
+ */
+ public int getReceivedAssuredSrUpdatesAcked()
+ {
+ return this.receivedAssuredSrUpdatesAcked.get();
+ }
+
+ /**
+ * Gets the number of updates received in assured safe read mode that we did
+ * not ack due to error (replay error).
+ * @return The number of updates received in assured safe read mode that we
+ * did not ack due to error (replay error).
+ */
+ public int getReceivedAssuredSrUpdatesNotAcked()
+ {
+ return this.receivedAssuredSrUpdatesNotAcked.get();
}
/**
@@ -2044,7 +2021,19 @@
*/
public Map<Short, Integer> getAssuredSdServerTimeoutUpdates()
{
- return assuredSdServerTimeoutUpdates;
+ // Clone a snapshot with synchronized section to have a consistent view in
+ // monitoring
+ Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
+ synchronized(assuredSdServerTimeoutUpdates)
+ {
+ Set<Short> keySet = assuredSdServerTimeoutUpdates.keySet();
+ for (Short serverId : keySet)
+ {
+ Integer i = assuredSdServerTimeoutUpdates.get(serverId);
+ snapshot.put(serverId, i);
+ }
+ }
+ return snapshot;
}
/**
@@ -2068,6 +2057,9 @@
assuredSrWrongStatusUpdates = new AtomicInteger(0);
assuredSrReplayErrorUpdates = new AtomicInteger(0);
assuredSrServerNotAcknowledgedUpdates = new HashMap<Short,Integer>();
+ receivedAssuredSrUpdates = new AtomicInteger(0);
+ receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
+ receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
assuredSdSentUpdates = new AtomicInteger(0);
assuredSdAcknowledgedUpdates = new AtomicInteger(0);
assuredSdTimeoutUpdates = new AtomicInteger(0);
@@ -2199,16 +2191,20 @@
* @param windowSize The window size that this domain should use.
* @param heartbeatInterval The heartbeatInterval that this domain should
* use.
+ * @param groupId The new group id to use
*/
public void changeConfig(
Collection<String> replicationServers,
int windowSize,
- long heartbeatInterval)
+ long heartbeatInterval,
+ byte groupId)
{
+ this.groupId = groupId;
+
if (broker != null)
{
if (broker.changeConfig(
- replicationServers, windowSize, heartbeatInterval))
+ replicationServers, windowSize, heartbeatInterval, groupId))
{
disableService();
enableService();
@@ -2319,6 +2315,13 @@
ackMsg.setFailedServers(idList);
}
broker.publish(ackMsg);
+ if (replayErrorMsg != null)
+ {
+ receivedAssuredSrUpdatesNotAcked.incrementAndGet();
+ } else
+ {
+ receivedAssuredSrUpdatesAcked.incrementAndGet();
+ }
}
} else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
{
@@ -2338,28 +2341,39 @@
}
/**
- * Publish an {@link UpdateMsg} to the Replication Service.
- * <p>
- * The Replication Service will handle the delivery of this {@link UpdateMsg}
- * to all the participants of this Replication Domain.
- * These members will be receive this {@link UpdateMsg} through a call
- * of the {@link #processUpdate(UpdateMsg)} message.
+ * Prepare a message if it is to be sent in assured mode.
+ * If the assured mode is enabled, this method should be called before
+ * publish(UpdateMsg msg) method. This will configure the update accordingly
+ * before it is sent and will prepare the mechanism that will block until the
+ * matching ack is received. To wait for the ack after publish call, use
+ * the waitForAckIfAssuredEnabled() method.
+ * The expected typical usage in a service inheriting from this class is
+ * the following sequence:
+ * UpdateMsg msg = xxx;
+ * prepareWaitForAckIfAssuredEnabled(msg);
+ * publish(msg);
+ * waitForAckIfAssuredEnabled(msg);
*
- * @param msg The UpdateMsg that should be pushed.
- * @throws TimeoutException When assured replication is enabled and the
- * configured timeout occurs when blocked waiting for the ack.
+ * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have
+ * no effect if assured replication is disabled.
+ * Note: this mechanism should not be used if using publish(byte[] msg)
+ * version as usage of these methods is already hidden inside.
+ *
+ * @param msg The update message to be sent soon.
*/
- public void publish(UpdateMsg msg) throws TimeoutException
+ protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
{
byte rsGroupId = broker.getRsGroupId();
-
- // If assured configured, set message accordingly to request an ack in the
- // right assured mode.
- // No ack requested for a RS with a different group id. Assured replication
- // suported for the same locality, i.e: a topology working in the same
- // geographical location). If we are connected to a RS which is not in our
- // locality, no need to ask for an ack.
- if ( assured && ( rsGroupId == groupId ) )
+ /*
+ * If assured configured, set message accordingly to request an ack in the
+ * right assured mode.
+ * No ack requested for a RS with a different group id. Assured
+ * replication suported for the same locality, i.e: a topology working in
+ * the same
+ * geographical location). If we are connected to a RS which is not in our
+ * locality, no need to ask for an ack.
+ */
+ if (assured && (rsGroupId == groupId))
{
msg.setAssured(true);
msg.setAssuredMode(assuredMode);
@@ -2373,18 +2387,29 @@
waitingAckMsgs.put(msg.getChangeNumber(), msg);
}
}
+ }
- // Publish the update
- broker.publish(msg);
- state.update(msg.getChangeNumber());
- numSentUpdates.incrementAndGet();
+ /**
+ * Wait for the processing of an assured message after it has been sent, if
+ * assured replication is configured, otherwise, do nothing.
+ * The prepareWaitForAckIfAssuredEnabled method should have been called
+ * before, see its comment for the full picture.
+ *
+ * @param msg The UpdateMsg for which we are waiting for an ack.
+ * @throws TimeoutException When the configured timeout occurs waiting for the
+ * ack.
+ */
+ protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
+ throws TimeoutException
+ {
+ byte rsGroupId = broker.getRsGroupId();
// If assured mode configured, wait for acknowledgement for the just sent
// message
- if ( assured && ( rsGroupId == groupId ) )
+ if (assured && (rsGroupId == groupId))
{
// Increment assured replication monitoring counters
- switch(assuredMode)
+ switch (assuredMode)
{
case SAFE_READ_MODE:
assuredSrSentUpdates.incrementAndGet();
@@ -2393,12 +2418,100 @@
assuredSdSentUpdates.incrementAndGet();
break;
default:
- // Should not happen
+ // Should not happen
}
-
- // Now wait for ack matching the sent assured update
- waitForAck(msg);
+ } else
+ {
+ // Not assured or bad group id, return immediately
+ return;
}
+
+ // Wait for the ack to be received, timing out if necessary
+ long startTime = System.currentTimeMillis();
+ synchronized (msg)
+ {
+ ChangeNumber cn = msg.getChangeNumber();
+ while (waitingAckMsgs.containsKey(cn))
+ {
+ try
+ {
+ // WARNING: this timeout may be difficult to optimize: too low, it
+ // may use too much CPU, too high, it may penalize performance...
+ msg.wait(10);
+ } catch (InterruptedException e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("waitForAck method interrupted for replication " +
+ "serviceID: " + serviceID);
+ }
+ break;
+ }
+ // Timeout ?
+ if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
+ {
+ // Timeout occured, be sure that ack is not being received and if so,
+ // remove the update from the wait list, log the timeout error and
+ // also update assured monitoring counters
+ UpdateMsg update;
+ synchronized (waitingAckMsgs)
+ {
+ update = waitingAckMsgs.remove(cn);
+ }
+
+ if (update != null)
+ {
+ // No luck, this is a real timeout
+ // Increment assured replication monitoring counters
+ switch (msg.getAssuredMode())
+ {
+ case SAFE_READ_MODE:
+ assuredSrNotAcknowledgedUpdates.incrementAndGet();
+ assuredSrTimeoutUpdates.incrementAndGet();
+ // Increment number of errors for our RS
+ updateAssuredErrorsByServer(
+ assuredSrServerNotAcknowledgedUpdates,
+ broker.getRsServerId());
+ break;
+ case SAFE_DATA_MODE:
+ assuredSdTimeoutUpdates.incrementAndGet();
+ // Increment number of errors for our RS
+ updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
+ broker.getRsServerId());
+ break;
+ default:
+ // Should not happen
+ }
+
+ throw new TimeoutException("No ack received for message cn: " + cn +
+ " and replication servceID: " + serviceID + " after " +
+ assuredTimeout + " ms.");
+ } else
+ {
+ // Ack received just before timeout limit: we can exit
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Publish an {@link UpdateMsg} to the Replication Service.
+ * <p>
+ * The Replication Service will handle the delivery of this {@link UpdateMsg}
+ * to all the participants of this Replication Domain.
+ * These members will be receive this {@link UpdateMsg} through a call
+ * of the {@link #processUpdate(UpdateMsg)} message.
+ *
+ * @param msg The UpdateMsg that should be pushed.
+ */
+ public void publish(UpdateMsg msg)
+ {
+ // Publish the update
+ broker.publish(msg);
+ state.update(msg.getChangeNumber());
+ numSentUpdates.incrementAndGet();
}
/**
@@ -2410,12 +2523,27 @@
public void publish(byte[] msg)
{
UpdateMsg update = new UpdateMsg(generator.newChangeNumber(), msg);
+
+ // If assured replication is configured, this will prepare blocking
+ // mechanism. If assured replication is disabled, this returns
+ // immediately
+ prepareWaitForAckIfAssuredEnabled(update);
+
+ publish(update);
+
try
{
- publish(update);
- } catch (TimeoutException e)
+ // If assured replication is enabled, this will wait for the matching
+ // ack or time out. If assured replication is disabled, this returns
+ // immediately
+ waitForAckIfAssuredEnabled(update);
+ } catch (TimeoutException ex)
{
- // Should never happen as assured mode not requested
+ // This exception may only be raised if assured replication is
+ // enabled
+ Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
+ assuredTimeout), msg.toString());
+ logError(errorMsg);
}
}
--
Gitblit v1.10.0