From a719d21181a3b1c98c16bc677e892cf67fed4e7f Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 18 Dec 2008 17:13:46 +0000
Subject: [PATCH] Assured Replication: - support for dynamic reconfiguration (domain and replication server) - performance improvement in domain (less lock time between sending threads) - performance improvement in server (safe data ack before DB push) - more monitoring info for safe read mode Misc: - support for dynamic domain group id reconfiguration

---
 opends/src/server/org/opends/server/replication/plugin/PendingChanges.java                                       |   18 -
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                    |    1 
 opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java                                  |    9 
 opends/src/messages/messages/replication.properties                                                              |    2 
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                                |  122 ++++++++++--
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                                   |    8 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java |    2 
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                              |   68 +++---
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |  356 ++++++++++++++++++++++++-----------
 9 files changed, 398 insertions(+), 188 deletions(-)

diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 64354b8..ca2ee16 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -349,7 +349,7 @@
  generation ID=%s when expected generation ID=%s
 NOTICE_DS_RECEIVED_ACK_ERROR_147=In replication service %s and server id %s, the \
  assured update message %s was acknowledged with the following errors: %s
-SEVERE_ERR_DS_ACK_TIMEOUT_148=In replication service %s, timeout after %s ms \
+NOTICE_DS_ACK_TIMEOUT_148=In replication service %s, timeout after %s ms \
  waiting for the acknowledgement of the assured update message: %s
 SEVERE_ERR_DS_UNKNOWN_ASSURED_MODE_149=In directory server %s, received unknown \
  assured update mode: %s, for domain %s. Message: %s
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 5f1372a..076d5d6 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -59,6 +59,7 @@
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CheckedOutputStream;
 import java.util.zip.DataFormatException;
@@ -318,26 +319,8 @@
     configDn = configuration.dn();
     this.updateToReplayQueue = updateToReplayQueue;
 
-    /*
-     * Fill assured configuration properties
-     */
-    AssuredType assuredType = configuration.getAssuredType();
-    switch (assuredType)
-    {
-      case NOT_ASSURED:
-        setAssured(false);
-        break;
-      case SAFE_DATA:
-        setAssured(true);
-        setAssuredMode(AssuredMode.SAFE_DATA_MODE);
-        break;
-      case SAFE_READ:
-        setAssured(true);
-        setAssuredMode(AssuredMode.SAFE_READ_MODE);
-        break;
-    }
-    setAssuredSdLevel((byte)configuration.getAssuredSdLevel());
-    setAssuredTimeout(configuration.getAssuredTimeout());
+    // Get assured configuration
+    readAssuredConfig(configuration);
 
     setGroupId((byte)configuration.getGroupId());
     setURLs(configuration.getReferralsUrl());
@@ -405,6 +388,72 @@
   }
 
   /**
+   * Gets and stores the assured replication configuration parameters. Returns
+   * a boolean indicating if the passed configuration has changed compared to
+   * previous values and the changes require a reconnection.
+   * @param configuration The configuration object
+   * @return True if the assured configuration changed and we need to reconnect
+   */
+  private boolean readAssuredConfig(ReplicationDomainCfg configuration)
+  {
+    boolean needReconnect = false;
+
+    byte newSdLevel = (byte) configuration.getAssuredSdLevel();
+    if ((isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)) &&
+      (newSdLevel != getAssuredSdLevel()))
+    {
+      needReconnect = true;
+    }
+
+    AssuredType newAssuredType = configuration.getAssuredType();
+    switch (newAssuredType)
+    {
+      case NOT_ASSURED:
+        if (isAssured())
+        {
+          needReconnect = true;
+        }
+        break;
+      case SAFE_DATA:
+        if (!isAssured() ||
+          (isAssured() && (getAssuredMode() == AssuredMode.SAFE_READ_MODE)))
+        {
+          needReconnect = true;
+        }
+        break;
+      case SAFE_READ:
+        if (!isAssured() ||
+          (isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)))
+        {
+          needReconnect = true;
+        }
+        break;
+    }
+
+    switch (newAssuredType)
+    {
+      case NOT_ASSURED:
+        setAssured(false);
+        break;
+      case SAFE_DATA:
+        setAssured(true);
+        setAssuredMode(AssuredMode.SAFE_DATA_MODE);
+        break;
+      case SAFE_READ:
+        setAssured(true);
+        setAssuredMode(AssuredMode.SAFE_READ_MODE);
+        break;
+    }
+    setAssuredSdLevel(newSdLevel);
+
+    // Changing timeout does not require restart as it is not sent in
+    // StartSessionMsg
+    setAssuredTimeout(configuration.getAssuredTimeout());
+
+    return needReconnect;
+  }
+
+  /**
    * Returns the base DN of this ReplicationDomain.
    *
    * @return The base DN of this ReplicationDomain
@@ -836,7 +885,27 @@
 
     if (!op.isSynchronizationOperation())
     {
+      // If assured replication is configured, this will prepare blocking
+      // mechanism. If assured replication is disabled, this returns
+      // immediately
+      prepareWaitForAckIfAssuredEnabled(msg);
+
       pendingChanges.pushCommittedChanges();
+
+      // If assured replication is enabled, this will wait for the matching
+      // ack or time out. If assured replication is disabled, this returns
+      // immediately
+      try
+      {
+        waitForAckIfAssuredEnabled(msg);
+      } catch (TimeoutException ex)
+      {
+        // This exception may only be raised if assured replication is
+        // enabled
+        Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getServiceID(),
+          Long.toString(getAssuredTimeout()), msg.toString());
+        logError(errorMsg);
+      }
     }
   }
 
@@ -2583,7 +2652,18 @@
     changeConfig(
         configuration.getReplicationServer(),
         configuration.getWindowSize(),
-        configuration.getHeartbeatInterval());
+        configuration.getHeartbeatInterval(),
+        (byte)configuration.getGroupId());
+
+    // Get assured configuration
+    boolean needReconnect = readAssuredConfig(configuration);
+
+    // Reconnect if required
+    if (needReconnect)
+    {
+      disableService();
+      enableService();
+    }
 
     return new ConfigChangeResult(ResultCode.SUCCESS, false);
   }
diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index a284aae..b9b512f 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,15 +26,11 @@
  */
 package org.opends.server.replication.plugin;
 
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
 
 import java.util.NoSuchElementException;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-import java.util.concurrent.TimeoutException;
-import org.opends.messages.Message;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ChangeNumberGenerator;
@@ -183,18 +179,8 @@
       {
         numSentUpdates++;
         LDAPUpdateMsg updateMsg = firstChange.getMsg();
-          try
-          {
-            domain.publish(updateMsg);
-          } catch (TimeoutException ex) {
-            // This exception may only be raised if assured replication is
-            // enabled
-            Message errorMsg = ERR_DS_ACK_TIMEOUT.get(
-              domain.getServiceID(), Long.toString(domain.getAssuredTimeout()),
-              updateMsg.toString());
-            logError(errorMsg);
-          }
-        }
+        domain.publish(updateMsg);
+      }
       pendingChanges.remove(firstChangeNumber);
 
       if (pendingChanges.isEmpty())
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index be59878..b0e043f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -656,6 +656,7 @@
     }
 
     rcvWindow = configuration.getWindowSize();
+    assuredTimeout = configuration.getAssuredTimeout();
 
     // changing the listen port requires to stop the listen thread
     // and restart it.
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 9408c24..2768301 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -213,40 +213,6 @@
       generationId = sourceHandler.getGenerationId();
     }
 
-    // look for the dbHandler that is responsible for the LDAP server which
-    // generated the change.
-    DbHandler dbHandler = null;
-    synchronized (sourceDbHandlers)
-    {
-      dbHandler = sourceDbHandlers.get(id);
-      if (dbHandler == null)
-      {
-        try
-        {
-          dbHandler = replicationServer.newDbHandler(id, baseDn);
-          generationIdSavedStatus = true;
-        } catch (DatabaseException e)
-        {
-          /*
-           * Because of database problem we can't save any more changes
-           * from at least one LDAP server.
-           * This replicationServer therefore can't do it's job properly anymore
-           * and needs to close all its connections and shutdown itself.
-           */
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          replicationServer.shutdown();
-          return;
-        }
-        sourceDbHandlers.put(id, dbHandler);
-      }
-    }
-
-    // Publish the messages to the source handler
-    dbHandler.add(update);
-
     /**
      * If this is an assured message (a message requesting ack), we must
      * construct the ExpectedAcksInfo object with the right number of expected
@@ -297,6 +263,40 @@
       }
     }
 
+    // look for the dbHandler that is responsible for the LDAP server which
+    // generated the change.
+    DbHandler dbHandler = null;
+    synchronized (sourceDbHandlers)
+    {
+      dbHandler = sourceDbHandlers.get(id);
+      if (dbHandler == null)
+      {
+        try
+        {
+          dbHandler = replicationServer.newDbHandler(id, baseDn);
+          generationIdSavedStatus = true;
+        } catch (DatabaseException e)
+        {
+          /*
+           * Because of database problem we can't save any more changes
+           * from at least one LDAP server.
+           * This replicationServer therefore can't do it's job properly anymore
+           * and needs to close all its connections and shutdown itself.
+           */
+          MessageBuilder mb = new MessageBuilder();
+          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+          mb.append(stackTraceToSingleLineString(e));
+          logError(mb.toMessage());
+          replicationServer.shutdown();
+          return;
+        }
+        sourceDbHandlers.put(id, dbHandler);
+      }
+    }
+
+    // Publish the messages to the source handler
+    dbHandler.add(update);
+
     List<Short> expectedServers = null;
     if (assuredMessage)
     {
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c0dff69..d70e1f2 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1548,9 +1548,11 @@
    *
    * @return                    A boolean indicating if the changes
    *                            requires to restart the service.
+   * @param groupId            The new group id to use
    */
   public boolean changeConfig(
-      Collection<String> replicationServers, int window, long heartbeatInterval)
+      Collection<String> replicationServers, int window, long heartbeatInterval,
+      byte groupId)
   {
     // These parameters needs to be renegociated with the ReplicationServer
     // so if they have changed, that requires restarting the session with
@@ -1563,7 +1565,8 @@
         (!(replicationServers.size() == servers.size()
         && replicationServers.containsAll(servers))) ||
         window != this.maxRcvWindow  ||
-        heartbeatInterval != this.heartbeatInterval)
+        heartbeatInterval != this.heartbeatInterval ||
+        (groupId != this.groupId))
     {
       needToRestartSession = true;
     }
@@ -1573,6 +1576,7 @@
     this.maxRcvWindow = window;
     this.halfRcvWindow = window / 2;
     this.heartbeatInterval = heartbeatInterval;
+    this.groupId = groupId;
 
     return needToRestartSession;
   }
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index fa95572..7c4df2b 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -215,7 +215,7 @@
   // Safe Data level (used when assuredMode is SAFE_DATA)
   private byte assuredSdLevel = (byte)1;
   // The timeout in ms that should be used, when waiting for assured acks
-  private long assuredTimeout = 1000;
+  private long assuredTimeout = 2000;
 
   // Group id
   private byte groupId = (byte)1;
@@ -257,6 +257,14 @@
   // format: <server id>:<number of failed updates>
   private Map<Short,Integer> assuredSrServerNotAcknowledgedUpdates =
     new HashMap<Short,Integer>();
+  // Number of updates received in Assured Mode, Safe Read request
+  private AtomicInteger receivedAssuredSrUpdates = new AtomicInteger(0);
+  // Number of updates received in Assured Mode, Safe Read request that we have
+  // acked without errors
+  private AtomicInteger receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
+  // Number of updates received in Assured Mode, Safe Read request that we have
+  // acked with errors
+  private AtomicInteger receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
   // Number of updates sent in Assured Mode, Safe Data
   private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
   // Number of updates sent in Assured Mode, Safe Data, that have been
@@ -765,86 +773,12 @@
     }
 
     numRcvdUpdates.incrementAndGet();
-    return update;
-  }
-
-  /**
-   * Wait for the processing of an assured message.
-   *
-   * @param msg The UpdateMsg for which we are waiting for an ack.
-   * @throws TimeoutException When the configured timeout occurs waiting for the
-   * ack.
-   */
-  private void waitForAck(UpdateMsg msg) throws TimeoutException
-  {
-    // Wait for the ack to be received, timing out if necessary
-    long startTime = System.currentTimeMillis();
-    synchronized (msg)
+    if (update.isAssured() && (update.getAssuredMode() ==
+      AssuredMode.SAFE_READ_MODE))
     {
-      ChangeNumber cn = msg.getChangeNumber();
-      while (waitingAckMsgs.containsKey(cn))
-      {
-        try
-        {
-          // WARNING: this timeout may be difficult to optimize: too low, it
-          // may use too much CPU, too high, it may penalize performance...
-          msg.wait(10);
-        } catch (InterruptedException e)
-        {
-          if (debugEnabled())
-          {
-            TRACER.debugInfo("waitForAck method interrupted for replication " +
-              "serviceID: " + serviceID);
-          }
-          break;
-        }
-        // Timeout ?
-        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
-        {
-          // Timeout occured, be sure that ack is not being received and if so,
-          // remove the update from the wait list, log the timeout error and
-          // also update assured monitoring counters
-          UpdateMsg update;
-          synchronized (waitingAckMsgs)
-          {
-            update = waitingAckMsgs.remove(cn);
-          }
-
-          if (update != null)
-          {
-            // No luck, this is a real timeout
-            // Increment assured replication monitoring counters
-            switch (msg.getAssuredMode())
-            {
-              case SAFE_READ_MODE:
-                assuredSrNotAcknowledgedUpdates.incrementAndGet();
-                assuredSrTimeoutUpdates.incrementAndGet();
-                // Increment number of errors for our RS
-                updateAssuredErrorsByServer(
-                  assuredSrServerNotAcknowledgedUpdates,
-                  broker.getRsServerId());
-                break;
-              case SAFE_DATA_MODE:
-                assuredSdTimeoutUpdates.incrementAndGet();
-                // Increment number of errors for our RS
-                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
-                  broker.getRsServerId());
-                break;
-              default:
-              // Should not happen
-            }
-
-            throw new TimeoutException("No ack received for message cn: " + cn +
-              " and replication servceID: " + serviceID + " after " +
-              assuredTimeout + " ms.");
-          } else
-          {
-            // Ack received just before timeout limit: we can exit
-            break;
-          }
-        }
-      }
+      receivedAssuredSrUpdates.incrementAndGet();
     }
+    return update;
   }
 
   /**
@@ -2002,7 +1936,50 @@
    */
   public Map<Short, Integer> getAssuredSrServerNotAcknowledgedUpdates()
   {
-    return assuredSrServerNotAcknowledgedUpdates;
+    // Clone a snapshot with synchronized section to have a consistent view in
+    // monitoring
+    Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
+    synchronized(assuredSrServerNotAcknowledgedUpdates)
+    {
+      Set<Short> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
+      for (Short serverId : keySet)
+      {
+        Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
+        snapshot.put(serverId, i);
+      }
+    }
+    return snapshot;
+  }
+
+  /**
+   * Gets the number of updates received in assured safe read mode request.
+   * @return The number of updates received in assured safe read mode request.
+   */
+  public int getReceivedAssuredSrUpdates()
+  {
+    return receivedAssuredSrUpdates.get();
+  }
+
+  /**
+   * Gets the number of updates received in assured safe read mode that we acked
+   * without error (no replay error).
+   * @return The number of updates received in assured safe read mode that we
+   * acked without error (no replay error).
+   */
+  public int getReceivedAssuredSrUpdatesAcked()
+  {
+    return this.receivedAssuredSrUpdatesAcked.get();
+  }
+
+  /**
+   * Gets the number of updates received in assured safe read mode that we did
+   * not ack due to error (replay error).
+   * @return The number of updates received in assured safe read mode that we
+   * did not ack due to error (replay error).
+   */
+  public int getReceivedAssuredSrUpdatesNotAcked()
+  {
+    return this.receivedAssuredSrUpdatesNotAcked.get();
   }
 
   /**
@@ -2044,7 +2021,19 @@
    */
   public Map<Short, Integer> getAssuredSdServerTimeoutUpdates()
   {
-    return assuredSdServerTimeoutUpdates;
+    // Clone a snapshot with synchronized section to have a consistent view in
+    // monitoring
+    Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
+    synchronized(assuredSdServerTimeoutUpdates)
+    {
+      Set<Short> keySet = assuredSdServerTimeoutUpdates.keySet();
+      for (Short serverId : keySet)
+      {
+        Integer i = assuredSdServerTimeoutUpdates.get(serverId);
+        snapshot.put(serverId, i);
+      }
+    }
+    return snapshot;
   }
 
   /**
@@ -2068,6 +2057,9 @@
     assuredSrWrongStatusUpdates = new AtomicInteger(0);
     assuredSrReplayErrorUpdates = new AtomicInteger(0);
     assuredSrServerNotAcknowledgedUpdates = new HashMap<Short,Integer>();
+    receivedAssuredSrUpdates = new AtomicInteger(0);
+    receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
+    receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
     assuredSdSentUpdates = new AtomicInteger(0);
     assuredSdAcknowledgedUpdates = new AtomicInteger(0);
     assuredSdTimeoutUpdates = new AtomicInteger(0);
@@ -2199,16 +2191,20 @@
    * @param windowSize         The window size that this domain should use.
    * @param heartbeatInterval  The heartbeatInterval that this domain should
    *                           use.
+   * @param groupId            The new group id to use
    */
   public void changeConfig(
       Collection<String> replicationServers,
       int windowSize,
-      long heartbeatInterval)
+      long heartbeatInterval,
+      byte groupId)
   {
+    this.groupId = groupId;
+
     if (broker != null)
     {
       if (broker.changeConfig(
-          replicationServers, windowSize, heartbeatInterval))
+          replicationServers, windowSize, heartbeatInterval, groupId))
       {
         disableService();
         enableService();
@@ -2319,6 +2315,13 @@
               ackMsg.setFailedServers(idList);
             }
             broker.publish(ackMsg);
+            if (replayErrorMsg != null)
+            {
+              receivedAssuredSrUpdatesNotAcked.incrementAndGet();
+            } else
+            {
+              receivedAssuredSrUpdatesAcked.incrementAndGet();
+            }
           }
         } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
         {
@@ -2338,28 +2341,39 @@
   }
 
   /**
-   * Publish an {@link UpdateMsg} to the Replication Service.
-   * <p>
-   * The Replication Service will handle the delivery of this {@link UpdateMsg}
-   * to all the participants of this Replication Domain.
-   * These members will be receive this {@link UpdateMsg} through a call
-   * of the {@link #processUpdate(UpdateMsg)} message.
+   * Prepare a message if it is to be sent in assured mode.
+   * If the assured mode is enabled, this method should be called before
+   * publish(UpdateMsg msg) method. This will configure the update accordingly
+   * before it is sent and will prepare the mechanism that will block until the
+   * matching ack is received. To wait for the ack after publish call, use
+   * the waitForAckIfAssuredEnabled() method.
+   * The expected typical usage in a service inheriting from this class is
+   * the following sequence:
+   * UpdateMsg msg = xxx;
+   * prepareWaitForAckIfAssuredEnabled(msg);
+   * publish(msg);
+   * waitForAckIfAssuredEnabled(msg);
    *
-   * @param msg The UpdateMsg that should be pushed.
-   * @throws TimeoutException When assured replication is enabled and the
-   * configured timeout occurs when blocked waiting for the ack.
+   * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have
+   * no effect if assured replication is disabled.
+   * Note: this mechanism should not be used if using publish(byte[] msg)
+   * version as usage of these methods is already hidden inside.
+   *
+   * @param msg The update message to be sent soon.
    */
-  public void publish(UpdateMsg msg) throws TimeoutException
+  protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
   {
     byte rsGroupId = broker.getRsGroupId();
-
-    // If assured configured, set message accordingly to request an ack in the
-    // right assured mode.
-    // No ack requested for a RS with a different group id. Assured replication
-    // suported for the same locality, i.e: a topology working in the same
-    // geographical location). If we are connected to a RS which is not in our
-    // locality, no need to ask for an ack.
-    if ( assured && ( rsGroupId == groupId ) )
+    /*
+     * If assured configured, set message accordingly to request an ack in the
+     * right assured mode.
+     * No ack requested for a RS with a different group id. Assured
+     * replication suported for the same locality, i.e: a topology working in
+     * the same
+     * geographical location). If we are connected to a RS which is not in our
+     * locality, no need to ask for an ack.
+     */
+    if (assured && (rsGroupId == groupId))
     {
       msg.setAssured(true);
       msg.setAssuredMode(assuredMode);
@@ -2373,18 +2387,29 @@
         waitingAckMsgs.put(msg.getChangeNumber(), msg);
       }
     }
+  }
 
-    // Publish the update
-    broker.publish(msg);
-    state.update(msg.getChangeNumber());
-    numSentUpdates.incrementAndGet();
+  /**
+   * Wait for the processing of an assured message after it has been sent, if
+   * assured replication is configured, otherwise, do nothing.
+   * The prepareWaitForAckIfAssuredEnabled method should have been called
+   * before, see its comment for the full picture.
+   *
+   * @param msg The UpdateMsg for which we are waiting for an ack.
+   * @throws TimeoutException When the configured timeout occurs waiting for the
+   * ack.
+   */
+  protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
+    throws TimeoutException
+  {
+    byte rsGroupId = broker.getRsGroupId();
 
     // If assured mode configured, wait for acknowledgement for the just sent
     // message
-    if ( assured && ( rsGroupId == groupId ) )
+    if (assured && (rsGroupId == groupId))
     {
       // Increment assured replication monitoring counters
-      switch(assuredMode)
+      switch (assuredMode)
       {
         case SAFE_READ_MODE:
           assuredSrSentUpdates.incrementAndGet();
@@ -2393,12 +2418,100 @@
           assuredSdSentUpdates.incrementAndGet();
           break;
         default:
-          // Should not happen
+        // Should not happen
       }
-
-      // Now wait for ack matching the sent assured update
-      waitForAck(msg);
+    } else
+    {
+      // Not assured or bad group id, return immediately
+      return;
     }
+
+    // Wait for the ack to be received, timing out if necessary
+    long startTime = System.currentTimeMillis();
+    synchronized (msg)
+    {
+      ChangeNumber cn = msg.getChangeNumber();
+      while (waitingAckMsgs.containsKey(cn))
+      {
+        try
+        {
+          // WARNING: this timeout may be difficult to optimize: too low, it
+          // may use too much CPU, too high, it may penalize performance...
+          msg.wait(10);
+        } catch (InterruptedException e)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugInfo("waitForAck method interrupted for replication " +
+              "serviceID: " + serviceID);
+          }
+          break;
+        }
+        // Timeout ?
+        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
+        {
+          // Timeout occured, be sure that ack is not being received and if so,
+          // remove the update from the wait list, log the timeout error and
+          // also update assured monitoring counters
+          UpdateMsg update;
+          synchronized (waitingAckMsgs)
+          {
+            update = waitingAckMsgs.remove(cn);
+          }
+
+          if (update != null)
+          {
+            // No luck, this is a real timeout
+            // Increment assured replication monitoring counters
+            switch (msg.getAssuredMode())
+            {
+              case SAFE_READ_MODE:
+                assuredSrNotAcknowledgedUpdates.incrementAndGet();
+                assuredSrTimeoutUpdates.incrementAndGet();
+                // Increment number of errors for our RS
+                updateAssuredErrorsByServer(
+                  assuredSrServerNotAcknowledgedUpdates,
+                  broker.getRsServerId());
+                break;
+              case SAFE_DATA_MODE:
+                assuredSdTimeoutUpdates.incrementAndGet();
+                // Increment number of errors for our RS
+                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
+                  broker.getRsServerId());
+                break;
+              default:
+              // Should not happen
+            }
+
+            throw new TimeoutException("No ack received for message cn: " + cn +
+              " and replication servceID: " + serviceID + " after " +
+              assuredTimeout + " ms.");
+          } else
+          {
+            // Ack received just before timeout limit: we can exit
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Publish an {@link UpdateMsg} to the Replication Service.
+   * <p>
+   * The Replication Service will handle the delivery of this {@link UpdateMsg}
+   * to all the participants of this Replication Domain.
+   * These members will be receive this {@link UpdateMsg} through a call
+   * of the {@link #processUpdate(UpdateMsg)} message.
+   *
+   * @param msg The UpdateMsg that should be pushed.
+   */
+  public void publish(UpdateMsg msg)
+  {
+    // Publish the update
+    broker.publish(msg);
+    state.update(msg.getChangeNumber());
+    numSentUpdates.incrementAndGet();
   }
 
   /**
@@ -2410,12 +2523,27 @@
   public void publish(byte[] msg)
   {
     UpdateMsg update = new UpdateMsg(generator.newChangeNumber(), msg);
+
+    // If assured replication is configured, this will prepare blocking
+    // mechanism. If assured replication is disabled, this returns
+    // immediately
+    prepareWaitForAckIfAssuredEnabled(update);
+
+    publish(update);
+
     try
     {
-      publish(update);
-    } catch (TimeoutException e)
+      // If assured replication is enabled, this will wait for the matching
+      // ack or time out. If assured replication is disabled, this returns
+      // immediately
+      waitForAckIfAssuredEnabled(update);
+    } catch (TimeoutException ex)
     {
-      // Should never happen as assured mode not requested
+      // This exception may only be raised if assured replication is
+      // enabled
+      Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
+        assuredTimeout), msg.toString());
+      logError(errorMsg);
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java b/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
index e677827..69efcf9 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -210,6 +210,15 @@
       attributes.add(builder.toAttribute());
     }
 
+    addMonitorData(attributes, "received-assured-sr-updates",
+      domain.getReceivedAssuredSrUpdates());
+
+    addMonitorData(attributes, "received-assured-sr-updates-acked",
+      domain.getReceivedAssuredSrUpdatesAcked());
+
+    addMonitorData(attributes, "received-assured-sr-updates-not-acked",
+      domain.getReceivedAssuredSrUpdatesNotAcked());
+
     addMonitorData(attributes, "assured-sd-sent-updates",
       domain.getAssuredSdSentUpdates());
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index d08f668..3d4941d 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -622,7 +622,9 @@
         UUID.randomUUID().toString());
 
       // Send it (this uses the defined assured conf at constructor time)
+      prepareWaitForAckIfAssuredEnabled(delMsg);
       publish(delMsg);
+      waitForAckIfAssuredEnabled(delMsg);
     }
   }
 

--
Gitblit v1.10.0