From a719d21181a3b1c98c16bc677e892cf67fed4e7f Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 18 Dec 2008 17:13:46 +0000
Subject: [PATCH] Assured Replication: - support for dynamic reconfiguration (domain and replication server) - performance improvement in domain (less lock time between sending threads) - performance improvement in server (safe data ack before DB push) - more monitoring info for safe read mode Misc: - support for dynamic domain group id reconfiguration
---
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java | 18 -
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 1
opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java | 9
opends/src/messages/messages/replication.properties | 2
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 122 ++++++++++--
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 8
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 2
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 68 +++---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 356 ++++++++++++++++++++++++-----------
9 files changed, 398 insertions(+), 188 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 64354b8..ca2ee16 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -349,7 +349,7 @@
generation ID=%s when expected generation ID=%s
NOTICE_DS_RECEIVED_ACK_ERROR_147=In replication service %s and server id %s, the \
assured update message %s was acknowledged with the following errors: %s
-SEVERE_ERR_DS_ACK_TIMEOUT_148=In replication service %s, timeout after %s ms \
+NOTICE_DS_ACK_TIMEOUT_148=In replication service %s, timeout after %s ms \
waiting for the acknowledgement of the assured update message: %s
SEVERE_ERR_DS_UNKNOWN_ASSURED_MODE_149=In directory server %s, received unknown \
assured update mode: %s, for domain %s. Message: %s
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 5f1372a..076d5d6 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -59,6 +59,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
@@ -318,26 +319,8 @@
configDn = configuration.dn();
this.updateToReplayQueue = updateToReplayQueue;
- /*
- * Fill assured configuration properties
- */
- AssuredType assuredType = configuration.getAssuredType();
- switch (assuredType)
- {
- case NOT_ASSURED:
- setAssured(false);
- break;
- case SAFE_DATA:
- setAssured(true);
- setAssuredMode(AssuredMode.SAFE_DATA_MODE);
- break;
- case SAFE_READ:
- setAssured(true);
- setAssuredMode(AssuredMode.SAFE_READ_MODE);
- break;
- }
- setAssuredSdLevel((byte)configuration.getAssuredSdLevel());
- setAssuredTimeout(configuration.getAssuredTimeout());
+ // Get assured configuration
+ readAssuredConfig(configuration);
setGroupId((byte)configuration.getGroupId());
setURLs(configuration.getReferralsUrl());
@@ -405,6 +388,72 @@
}
/**
+ * Gets and stores the assured replication configuration parameters. Returns
+ * a boolean indicating if the passed configuration has changed compared to
+ * previous values and the changes require a reconnection.
+ * @param configuration The configuration object
+ * @return True if the assured configuration changed and we need to reconnect
+ */
+ private boolean readAssuredConfig(ReplicationDomainCfg configuration)
+ {
+ boolean needReconnect = false;
+
+ byte newSdLevel = (byte) configuration.getAssuredSdLevel();
+ if ((isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)) &&
+ (newSdLevel != getAssuredSdLevel()))
+ {
+ needReconnect = true;
+ }
+
+ AssuredType newAssuredType = configuration.getAssuredType();
+ switch (newAssuredType)
+ {
+ case NOT_ASSURED:
+ if (isAssured())
+ {
+ needReconnect = true;
+ }
+ break;
+ case SAFE_DATA:
+ if (!isAssured() ||
+ (isAssured() && (getAssuredMode() == AssuredMode.SAFE_READ_MODE)))
+ {
+ needReconnect = true;
+ }
+ break;
+ case SAFE_READ:
+ if (!isAssured() ||
+ (isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)))
+ {
+ needReconnect = true;
+ }
+ break;
+ }
+
+ switch (newAssuredType)
+ {
+ case NOT_ASSURED:
+ setAssured(false);
+ break;
+ case SAFE_DATA:
+ setAssured(true);
+ setAssuredMode(AssuredMode.SAFE_DATA_MODE);
+ break;
+ case SAFE_READ:
+ setAssured(true);
+ setAssuredMode(AssuredMode.SAFE_READ_MODE);
+ break;
+ }
+ setAssuredSdLevel(newSdLevel);
+
+ // Changing timeout does not require restart as it is not sent in
+ // StartSessionMsg
+ setAssuredTimeout(configuration.getAssuredTimeout());
+
+ return needReconnect;
+ }
+
+ /**
* Returns the base DN of this ReplicationDomain.
*
* @return The base DN of this ReplicationDomain
@@ -836,7 +885,27 @@
if (!op.isSynchronizationOperation())
{
+ // If assured replication is configured, this will prepare blocking
+ // mechanism. If assured replication is disabled, this returns
+ // immediately
+ prepareWaitForAckIfAssuredEnabled(msg);
+
pendingChanges.pushCommittedChanges();
+
+ // If assured replication is enabled, this will wait for the matching
+ // ack or time out. If assured replication is disabled, this returns
+ // immediately
+ try
+ {
+ waitForAckIfAssuredEnabled(msg);
+ } catch (TimeoutException ex)
+ {
+ // This exception may only be raised if assured replication is
+ // enabled
+ Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getServiceID(),
+ Long.toString(getAssuredTimeout()), msg.toString());
+ logError(errorMsg);
+ }
}
}
@@ -2583,7 +2652,18 @@
changeConfig(
configuration.getReplicationServer(),
configuration.getWindowSize(),
- configuration.getHeartbeatInterval());
+ configuration.getHeartbeatInterval(),
+ (byte)configuration.getGroupId());
+
+ // Get assured configuration
+ boolean needReconnect = readAssuredConfig(configuration);
+
+ // Reconnect if required
+ if (needReconnect)
+ {
+ disableService();
+ enableService();
+ }
return new ConfigChangeResult(ResultCode.SUCCESS, false);
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index a284aae..b9b512f 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,15 +26,11 @@
*/
package org.opends.server.replication.plugin;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.concurrent.TimeoutException;
-import org.opends.messages.Message;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
@@ -183,18 +179,8 @@
{
numSentUpdates++;
LDAPUpdateMsg updateMsg = firstChange.getMsg();
- try
- {
- domain.publish(updateMsg);
- } catch (TimeoutException ex) {
- // This exception may only be raised if assured replication is
- // enabled
- Message errorMsg = ERR_DS_ACK_TIMEOUT.get(
- domain.getServiceID(), Long.toString(domain.getAssuredTimeout()),
- updateMsg.toString());
- logError(errorMsg);
- }
- }
+ domain.publish(updateMsg);
+ }
pendingChanges.remove(firstChangeNumber);
if (pendingChanges.isEmpty())
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index be59878..b0e043f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -656,6 +656,7 @@
}
rcvWindow = configuration.getWindowSize();
+ assuredTimeout = configuration.getAssuredTimeout();
// changing the listen port requires to stop the listen thread
// and restart it.
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 9408c24..2768301 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -213,40 +213,6 @@
generationId = sourceHandler.getGenerationId();
}
- // look for the dbHandler that is responsible for the LDAP server which
- // generated the change.
- DbHandler dbHandler = null;
- synchronized (sourceDbHandlers)
- {
- dbHandler = sourceDbHandlers.get(id);
- if (dbHandler == null)
- {
- try
- {
- dbHandler = replicationServer.newDbHandler(id, baseDn);
- generationIdSavedStatus = true;
- } catch (DatabaseException e)
- {
- /*
- * Because of database problem we can't save any more changes
- * from at least one LDAP server.
- * This replicationServer therefore can't do it's job properly anymore
- * and needs to close all its connections and shutdown itself.
- */
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
- return;
- }
- sourceDbHandlers.put(id, dbHandler);
- }
- }
-
- // Publish the messages to the source handler
- dbHandler.add(update);
-
/**
* If this is an assured message (a message requesting ack), we must
* construct the ExpectedAcksInfo object with the right number of expected
@@ -297,6 +263,40 @@
}
}
+ // look for the dbHandler that is responsible for the LDAP server which
+ // generated the change.
+ DbHandler dbHandler = null;
+ synchronized (sourceDbHandlers)
+ {
+ dbHandler = sourceDbHandlers.get(id);
+ if (dbHandler == null)
+ {
+ try
+ {
+ dbHandler = replicationServer.newDbHandler(id, baseDn);
+ generationIdSavedStatus = true;
+ } catch (DatabaseException e)
+ {
+ /*
+ * Because of database problem we can't save any more changes
+ * from at least one LDAP server.
+ * This replicationServer therefore can't do it's job properly anymore
+ * and needs to close all its connections and shutdown itself.
+ */
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ replicationServer.shutdown();
+ return;
+ }
+ sourceDbHandlers.put(id, dbHandler);
+ }
+ }
+
+ // Publish the messages to the source handler
+ dbHandler.add(update);
+
List<Short> expectedServers = null;
if (assuredMessage)
{
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c0dff69..d70e1f2 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1548,9 +1548,11 @@
*
* @return A boolean indicating if the changes
* requires to restart the service.
+ * @param groupId The new group id to use
*/
public boolean changeConfig(
- Collection<String> replicationServers, int window, long heartbeatInterval)
+ Collection<String> replicationServers, int window, long heartbeatInterval,
+ byte groupId)
{
// These parameters needs to be renegociated with the ReplicationServer
// so if they have changed, that requires restarting the session with
@@ -1563,7 +1565,8 @@
(!(replicationServers.size() == servers.size()
&& replicationServers.containsAll(servers))) ||
window != this.maxRcvWindow ||
- heartbeatInterval != this.heartbeatInterval)
+ heartbeatInterval != this.heartbeatInterval ||
+ (groupId != this.groupId))
{
needToRestartSession = true;
}
@@ -1573,6 +1576,7 @@
this.maxRcvWindow = window;
this.halfRcvWindow = window / 2;
this.heartbeatInterval = heartbeatInterval;
+ this.groupId = groupId;
return needToRestartSession;
}
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index fa95572..7c4df2b 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -215,7 +215,7 @@
// Safe Data level (used when assuredMode is SAFE_DATA)
private byte assuredSdLevel = (byte)1;
// The timeout in ms that should be used, when waiting for assured acks
- private long assuredTimeout = 1000;
+ private long assuredTimeout = 2000;
// Group id
private byte groupId = (byte)1;
@@ -257,6 +257,14 @@
// format: <server id>:<number of failed updates>
private Map<Short,Integer> assuredSrServerNotAcknowledgedUpdates =
new HashMap<Short,Integer>();
+ // Number of updates received in Assured Mode, Safe Read request
+ private AtomicInteger receivedAssuredSrUpdates = new AtomicInteger(0);
+ // Number of updates received in Assured Mode, Safe Read request that we have
+ // acked without errors
+ private AtomicInteger receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
+ // Number of updates received in Assured Mode, Safe Read request that we have
+ // acked with errors
+ private AtomicInteger receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
// Number of updates sent in Assured Mode, Safe Data
private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
// Number of updates sent in Assured Mode, Safe Data, that have been
@@ -765,86 +773,12 @@
}
numRcvdUpdates.incrementAndGet();
- return update;
- }
-
- /**
- * Wait for the processing of an assured message.
- *
- * @param msg The UpdateMsg for which we are waiting for an ack.
- * @throws TimeoutException When the configured timeout occurs waiting for the
- * ack.
- */
- private void waitForAck(UpdateMsg msg) throws TimeoutException
- {
- // Wait for the ack to be received, timing out if necessary
- long startTime = System.currentTimeMillis();
- synchronized (msg)
+ if (update.isAssured() && (update.getAssuredMode() ==
+ AssuredMode.SAFE_READ_MODE))
{
- ChangeNumber cn = msg.getChangeNumber();
- while (waitingAckMsgs.containsKey(cn))
- {
- try
- {
- // WARNING: this timeout may be difficult to optimize: too low, it
- // may use too much CPU, too high, it may penalize performance...
- msg.wait(10);
- } catch (InterruptedException e)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("waitForAck method interrupted for replication " +
- "serviceID: " + serviceID);
- }
- break;
- }
- // Timeout ?
- if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
- {
- // Timeout occured, be sure that ack is not being received and if so,
- // remove the update from the wait list, log the timeout error and
- // also update assured monitoring counters
- UpdateMsg update;
- synchronized (waitingAckMsgs)
- {
- update = waitingAckMsgs.remove(cn);
- }
-
- if (update != null)
- {
- // No luck, this is a real timeout
- // Increment assured replication monitoring counters
- switch (msg.getAssuredMode())
- {
- case SAFE_READ_MODE:
- assuredSrNotAcknowledgedUpdates.incrementAndGet();
- assuredSrTimeoutUpdates.incrementAndGet();
- // Increment number of errors for our RS
- updateAssuredErrorsByServer(
- assuredSrServerNotAcknowledgedUpdates,
- broker.getRsServerId());
- break;
- case SAFE_DATA_MODE:
- assuredSdTimeoutUpdates.incrementAndGet();
- // Increment number of errors for our RS
- updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
- broker.getRsServerId());
- break;
- default:
- // Should not happen
- }
-
- throw new TimeoutException("No ack received for message cn: " + cn +
- " and replication servceID: " + serviceID + " after " +
- assuredTimeout + " ms.");
- } else
- {
- // Ack received just before timeout limit: we can exit
- break;
- }
- }
- }
+ receivedAssuredSrUpdates.incrementAndGet();
}
+ return update;
}
/**
@@ -2002,7 +1936,50 @@
*/
public Map<Short, Integer> getAssuredSrServerNotAcknowledgedUpdates()
{
- return assuredSrServerNotAcknowledgedUpdates;
+ // Clone a snapshot with synchronized section to have a consistent view in
+ // monitoring
+ Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
+ synchronized(assuredSrServerNotAcknowledgedUpdates)
+ {
+ Set<Short> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
+ for (Short serverId : keySet)
+ {
+ Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
+ snapshot.put(serverId, i);
+ }
+ }
+ return snapshot;
+ }
+
+ /**
+ * Gets the number of updates received in assured safe read mode request.
+ * @return The number of updates received in assured safe read mode request.
+ */
+ public int getReceivedAssuredSrUpdates()
+ {
+ return receivedAssuredSrUpdates.get();
+ }
+
+ /**
+ * Gets the number of updates received in assured safe read mode that we acked
+ * without error (no replay error).
+ * @return The number of updates received in assured safe read mode that we
+ * acked without error (no replay error).
+ */
+ public int getReceivedAssuredSrUpdatesAcked()
+ {
+ return this.receivedAssuredSrUpdatesAcked.get();
+ }
+
+ /**
+ * Gets the number of updates received in assured safe read mode that we did
+ * not ack due to error (replay error).
+ * @return The number of updates received in assured safe read mode that we
+ * did not ack due to error (replay error).
+ */
+ public int getReceivedAssuredSrUpdatesNotAcked()
+ {
+ return this.receivedAssuredSrUpdatesNotAcked.get();
}
/**
@@ -2044,7 +2021,19 @@
*/
public Map<Short, Integer> getAssuredSdServerTimeoutUpdates()
{
- return assuredSdServerTimeoutUpdates;
+ // Clone a snapshot with synchronized section to have a consistent view in
+ // monitoring
+ Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
+ synchronized(assuredSdServerTimeoutUpdates)
+ {
+ Set<Short> keySet = assuredSdServerTimeoutUpdates.keySet();
+ for (Short serverId : keySet)
+ {
+ Integer i = assuredSdServerTimeoutUpdates.get(serverId);
+ snapshot.put(serverId, i);
+ }
+ }
+ return snapshot;
}
/**
@@ -2068,6 +2057,9 @@
assuredSrWrongStatusUpdates = new AtomicInteger(0);
assuredSrReplayErrorUpdates = new AtomicInteger(0);
assuredSrServerNotAcknowledgedUpdates = new HashMap<Short,Integer>();
+ receivedAssuredSrUpdates = new AtomicInteger(0);
+ receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
+ receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
assuredSdSentUpdates = new AtomicInteger(0);
assuredSdAcknowledgedUpdates = new AtomicInteger(0);
assuredSdTimeoutUpdates = new AtomicInteger(0);
@@ -2199,16 +2191,20 @@
* @param windowSize The window size that this domain should use.
* @param heartbeatInterval The heartbeatInterval that this domain should
* use.
+ * @param groupId The new group id to use
*/
public void changeConfig(
Collection<String> replicationServers,
int windowSize,
- long heartbeatInterval)
+ long heartbeatInterval,
+ byte groupId)
{
+ this.groupId = groupId;
+
if (broker != null)
{
if (broker.changeConfig(
- replicationServers, windowSize, heartbeatInterval))
+ replicationServers, windowSize, heartbeatInterval, groupId))
{
disableService();
enableService();
@@ -2319,6 +2315,13 @@
ackMsg.setFailedServers(idList);
}
broker.publish(ackMsg);
+ if (replayErrorMsg != null)
+ {
+ receivedAssuredSrUpdatesNotAcked.incrementAndGet();
+ } else
+ {
+ receivedAssuredSrUpdatesAcked.incrementAndGet();
+ }
}
} else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
{
@@ -2338,28 +2341,39 @@
}
/**
- * Publish an {@link UpdateMsg} to the Replication Service.
- * <p>
- * The Replication Service will handle the delivery of this {@link UpdateMsg}
- * to all the participants of this Replication Domain.
- * These members will be receive this {@link UpdateMsg} through a call
- * of the {@link #processUpdate(UpdateMsg)} message.
+ * Prepare a message if it is to be sent in assured mode.
+ * If the assured mode is enabled, this method should be called before
+ * publish(UpdateMsg msg) method. This will configure the update accordingly
+ * before it is sent and will prepare the mechanism that will block until the
+ * matching ack is received. To wait for the ack after publish call, use
+ * the waitForAckIfAssuredEnabled() method.
+ * The expected typical usage in a service inheriting from this class is
+ * the following sequence:
+ * UpdateMsg msg = xxx;
+ * prepareWaitForAckIfAssuredEnabled(msg);
+ * publish(msg);
+ * waitForAckIfAssuredEnabled(msg);
*
- * @param msg The UpdateMsg that should be pushed.
- * @throws TimeoutException When assured replication is enabled and the
- * configured timeout occurs when blocked waiting for the ack.
+ * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have
+ * no effect if assured replication is disabled.
+ * Note: this mechanism should not be used if using publish(byte[] msg)
+ * version as usage of these methods is already hidden inside.
+ *
+ * @param msg The update message to be sent soon.
*/
- public void publish(UpdateMsg msg) throws TimeoutException
+ protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
{
byte rsGroupId = broker.getRsGroupId();
-
- // If assured configured, set message accordingly to request an ack in the
- // right assured mode.
- // No ack requested for a RS with a different group id. Assured replication
- // suported for the same locality, i.e: a topology working in the same
- // geographical location). If we are connected to a RS which is not in our
- // locality, no need to ask for an ack.
- if ( assured && ( rsGroupId == groupId ) )
+ /*
+ * If assured configured, set message accordingly to request an ack in the
+ * right assured mode.
+ * No ack requested for a RS with a different group id. Assured
+ * replication suported for the same locality, i.e: a topology working in
+ * the same
+ * geographical location). If we are connected to a RS which is not in our
+ * locality, no need to ask for an ack.
+ */
+ if (assured && (rsGroupId == groupId))
{
msg.setAssured(true);
msg.setAssuredMode(assuredMode);
@@ -2373,18 +2387,29 @@
waitingAckMsgs.put(msg.getChangeNumber(), msg);
}
}
+ }
- // Publish the update
- broker.publish(msg);
- state.update(msg.getChangeNumber());
- numSentUpdates.incrementAndGet();
+ /**
+ * Wait for the processing of an assured message after it has been sent, if
+ * assured replication is configured, otherwise, do nothing.
+ * The prepareWaitForAckIfAssuredEnabled method should have been called
+ * before, see its comment for the full picture.
+ *
+ * @param msg The UpdateMsg for which we are waiting for an ack.
+ * @throws TimeoutException When the configured timeout occurs waiting for the
+ * ack.
+ */
+ protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
+ throws TimeoutException
+ {
+ byte rsGroupId = broker.getRsGroupId();
// If assured mode configured, wait for acknowledgement for the just sent
// message
- if ( assured && ( rsGroupId == groupId ) )
+ if (assured && (rsGroupId == groupId))
{
// Increment assured replication monitoring counters
- switch(assuredMode)
+ switch (assuredMode)
{
case SAFE_READ_MODE:
assuredSrSentUpdates.incrementAndGet();
@@ -2393,12 +2418,100 @@
assuredSdSentUpdates.incrementAndGet();
break;
default:
- // Should not happen
+ // Should not happen
}
-
- // Now wait for ack matching the sent assured update
- waitForAck(msg);
+ } else
+ {
+ // Not assured or bad group id, return immediately
+ return;
}
+
+ // Wait for the ack to be received, timing out if necessary
+ long startTime = System.currentTimeMillis();
+ synchronized (msg)
+ {
+ ChangeNumber cn = msg.getChangeNumber();
+ while (waitingAckMsgs.containsKey(cn))
+ {
+ try
+ {
+ // WARNING: this timeout may be difficult to optimize: too low, it
+ // may use too much CPU, too high, it may penalize performance...
+ msg.wait(10);
+ } catch (InterruptedException e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("waitForAck method interrupted for replication " +
+ "serviceID: " + serviceID);
+ }
+ break;
+ }
+ // Timeout ?
+ if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
+ {
+ // Timeout occured, be sure that ack is not being received and if so,
+ // remove the update from the wait list, log the timeout error and
+ // also update assured monitoring counters
+ UpdateMsg update;
+ synchronized (waitingAckMsgs)
+ {
+ update = waitingAckMsgs.remove(cn);
+ }
+
+ if (update != null)
+ {
+ // No luck, this is a real timeout
+ // Increment assured replication monitoring counters
+ switch (msg.getAssuredMode())
+ {
+ case SAFE_READ_MODE:
+ assuredSrNotAcknowledgedUpdates.incrementAndGet();
+ assuredSrTimeoutUpdates.incrementAndGet();
+ // Increment number of errors for our RS
+ updateAssuredErrorsByServer(
+ assuredSrServerNotAcknowledgedUpdates,
+ broker.getRsServerId());
+ break;
+ case SAFE_DATA_MODE:
+ assuredSdTimeoutUpdates.incrementAndGet();
+ // Increment number of errors for our RS
+ updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
+ broker.getRsServerId());
+ break;
+ default:
+ // Should not happen
+ }
+
+ throw new TimeoutException("No ack received for message cn: " + cn +
+ " and replication servceID: " + serviceID + " after " +
+ assuredTimeout + " ms.");
+ } else
+ {
+ // Ack received just before timeout limit: we can exit
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Publish an {@link UpdateMsg} to the Replication Service.
+ * <p>
+ * The Replication Service will handle the delivery of this {@link UpdateMsg}
+ * to all the participants of this Replication Domain.
+ * These members will be receive this {@link UpdateMsg} through a call
+ * of the {@link #processUpdate(UpdateMsg)} message.
+ *
+ * @param msg The UpdateMsg that should be pushed.
+ */
+ public void publish(UpdateMsg msg)
+ {
+ // Publish the update
+ broker.publish(msg);
+ state.update(msg.getChangeNumber());
+ numSentUpdates.incrementAndGet();
}
/**
@@ -2410,12 +2523,27 @@
public void publish(byte[] msg)
{
UpdateMsg update = new UpdateMsg(generator.newChangeNumber(), msg);
+
+ // If assured replication is configured, this will prepare blocking
+ // mechanism. If assured replication is disabled, this returns
+ // immediately
+ prepareWaitForAckIfAssuredEnabled(update);
+
+ publish(update);
+
try
{
- publish(update);
- } catch (TimeoutException e)
+ // If assured replication is enabled, this will wait for the matching
+ // ack or time out. If assured replication is disabled, this returns
+ // immediately
+ waitForAckIfAssuredEnabled(update);
+ } catch (TimeoutException ex)
{
- // Should never happen as assured mode not requested
+ // This exception may only be raised if assured replication is
+ // enabled
+ Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
+ assuredTimeout), msg.toString());
+ logError(errorMsg);
}
}
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java b/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
index e677827..69efcf9 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -210,6 +210,15 @@
attributes.add(builder.toAttribute());
}
+ addMonitorData(attributes, "received-assured-sr-updates",
+ domain.getReceivedAssuredSrUpdates());
+
+ addMonitorData(attributes, "received-assured-sr-updates-acked",
+ domain.getReceivedAssuredSrUpdatesAcked());
+
+ addMonitorData(attributes, "received-assured-sr-updates-not-acked",
+ domain.getReceivedAssuredSrUpdatesNotAcked());
+
addMonitorData(attributes, "assured-sd-sent-updates",
domain.getAssuredSdSentUpdates());
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index d08f668..3d4941d 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -622,7 +622,9 @@
UUID.randomUUID().toString());
// Send it (this uses the defined assured conf at constructor time)
+ prepareWaitForAckIfAssuredEnabled(delMsg);
publish(delMsg);
+ waitForAckIfAssuredEnabled(delMsg);
}
}
--
Gitblit v1.10.0