mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
18.13.2008 a719d21181a3b1c98c16bc677e892cf67fed4e7f
Assured Replication:
- support for dynamic reconfiguration (domain and replication server)
- performance improvement in domain (less lock time between sending threads)
- performance improvement in server (safe data ack before DB push)
- more monitoring info for safe read mode
Misc:
- support for dynamic domain group id reconfiguration

9 files modified
576 ■■■■■ changed files
opends/src/messages/messages/replication.properties 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 122 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java 14 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 68 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 350 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java 9 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 2 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -349,7 +349,7 @@
 generation ID=%s when expected generation ID=%s
NOTICE_DS_RECEIVED_ACK_ERROR_147=In replication service %s and server id %s, the \
 assured update message %s was acknowledged with the following errors: %s
SEVERE_ERR_DS_ACK_TIMEOUT_148=In replication service %s, timeout after %s ms \
NOTICE_DS_ACK_TIMEOUT_148=In replication service %s, timeout after %s ms \
 waiting for the acknowledgement of the assured update message: %s
SEVERE_ERR_DS_UNKNOWN_ASSURED_MODE_149=In directory server %s, received unknown \
 assured update mode: %s, for domain %s. Message: %s
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -59,6 +59,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
@@ -318,26 +319,8 @@
    configDn = configuration.dn();
    this.updateToReplayQueue = updateToReplayQueue;
    /*
     * Fill assured configuration properties
     */
    AssuredType assuredType = configuration.getAssuredType();
    switch (assuredType)
    {
      case NOT_ASSURED:
        setAssured(false);
        break;
      case SAFE_DATA:
        setAssured(true);
        setAssuredMode(AssuredMode.SAFE_DATA_MODE);
        break;
      case SAFE_READ:
        setAssured(true);
        setAssuredMode(AssuredMode.SAFE_READ_MODE);
        break;
    }
    setAssuredSdLevel((byte)configuration.getAssuredSdLevel());
    setAssuredTimeout(configuration.getAssuredTimeout());
    // Get assured configuration
    readAssuredConfig(configuration);
    setGroupId((byte)configuration.getGroupId());
    setURLs(configuration.getReferralsUrl());
@@ -405,6 +388,72 @@
  }
  /**
   * Gets and stores the assured replication configuration parameters. Returns
   * a boolean indicating if the passed configuration has changed compared to
   * previous values and the changes require a reconnection.
   * @param configuration The configuration object
   * @return True if the assured configuration changed and we need to reconnect
   */
  private boolean readAssuredConfig(ReplicationDomainCfg configuration)
  {
    boolean needReconnect = false;
    byte newSdLevel = (byte) configuration.getAssuredSdLevel();
    if ((isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)) &&
      (newSdLevel != getAssuredSdLevel()))
    {
      needReconnect = true;
    }
    AssuredType newAssuredType = configuration.getAssuredType();
    switch (newAssuredType)
    {
      case NOT_ASSURED:
        if (isAssured())
        {
          needReconnect = true;
        }
        break;
      case SAFE_DATA:
        if (!isAssured() ||
          (isAssured() && (getAssuredMode() == AssuredMode.SAFE_READ_MODE)))
        {
          needReconnect = true;
        }
        break;
      case SAFE_READ:
        if (!isAssured() ||
          (isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)))
        {
          needReconnect = true;
        }
        break;
    }
    switch (newAssuredType)
    {
      case NOT_ASSURED:
        setAssured(false);
        break;
      case SAFE_DATA:
        setAssured(true);
        setAssuredMode(AssuredMode.SAFE_DATA_MODE);
        break;
      case SAFE_READ:
        setAssured(true);
        setAssuredMode(AssuredMode.SAFE_READ_MODE);
        break;
    }
    setAssuredSdLevel(newSdLevel);
    // Changing timeout does not require restart as it is not sent in
    // StartSessionMsg
    setAssuredTimeout(configuration.getAssuredTimeout());
    return needReconnect;
  }
  /**
   * Returns the base DN of this ReplicationDomain.
   *
   * @return The base DN of this ReplicationDomain
@@ -836,7 +885,27 @@
    if (!op.isSynchronizationOperation())
    {
      // If assured replication is configured, this will prepare blocking
      // mechanism. If assured replication is disabled, this returns
      // immediately
      prepareWaitForAckIfAssuredEnabled(msg);
      pendingChanges.pushCommittedChanges();
      // If assured replication is enabled, this will wait for the matching
      // ack or time out. If assured replication is disabled, this returns
      // immediately
      try
      {
        waitForAckIfAssuredEnabled(msg);
      } catch (TimeoutException ex)
      {
        // This exception may only be raised if assured replication is
        // enabled
        Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getServiceID(),
          Long.toString(getAssuredTimeout()), msg.toString());
        logError(errorMsg);
      }
    }
  }
@@ -2583,7 +2652,18 @@
    changeConfig(
        configuration.getReplicationServer(),
        configuration.getWindowSize(),
        configuration.getHeartbeatInterval());
        configuration.getHeartbeatInterval(),
        (byte)configuration.getGroupId());
    // Get assured configuration
    boolean needReconnect = readAssuredConfig(configuration);
    // Reconnect if required
    if (needReconnect)
    {
      disableService();
      enableService();
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,15 +26,11 @@
 */
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
import org.opends.messages.Message;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
@@ -183,17 +179,7 @@
      {
        numSentUpdates++;
        LDAPUpdateMsg updateMsg = firstChange.getMsg();
          try
          {
            domain.publish(updateMsg);
          } catch (TimeoutException ex) {
            // This exception may only be raised if assured replication is
            // enabled
            Message errorMsg = ERR_DS_ACK_TIMEOUT.get(
              domain.getServiceID(), Long.toString(domain.getAssuredTimeout()),
              updateMsg.toString());
            logError(errorMsg);
          }
        }
      pendingChanges.remove(firstChangeNumber);
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -656,6 +656,7 @@
    }
    rcvWindow = configuration.getWindowSize();
    assuredTimeout = configuration.getAssuredTimeout();
    // changing the listen port requires to stop the listen thread
    // and restart it.
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -213,40 +213,6 @@
      generationId = sourceHandler.getGenerationId();
    }
    // look for the dbHandler that is responsible for the LDAP server which
    // generated the change.
    DbHandler dbHandler = null;
    synchronized (sourceDbHandlers)
    {
      dbHandler = sourceDbHandlers.get(id);
      if (dbHandler == null)
      {
        try
        {
          dbHandler = replicationServer.newDbHandler(id, baseDn);
          generationIdSavedStatus = true;
        } catch (DatabaseException e)
        {
          /*
           * Because of database problem we can't save any more changes
           * from at least one LDAP server.
           * This replicationServer therefore can't do it's job properly anymore
           * and needs to close all its connections and shutdown itself.
           */
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
          return;
        }
        sourceDbHandlers.put(id, dbHandler);
      }
    }
    // Publish the messages to the source handler
    dbHandler.add(update);
    /**
     * If this is an assured message (a message requesting ack), we must
     * construct the ExpectedAcksInfo object with the right number of expected
@@ -297,6 +263,40 @@
      }
    }
    // look for the dbHandler that is responsible for the LDAP server which
    // generated the change.
    DbHandler dbHandler = null;
    synchronized (sourceDbHandlers)
    {
      dbHandler = sourceDbHandlers.get(id);
      if (dbHandler == null)
      {
        try
        {
          dbHandler = replicationServer.newDbHandler(id, baseDn);
          generationIdSavedStatus = true;
        } catch (DatabaseException e)
        {
          /*
           * Because of database problem we can't save any more changes
           * from at least one LDAP server.
           * This replicationServer therefore can't do it's job properly anymore
           * and needs to close all its connections and shutdown itself.
           */
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          replicationServer.shutdown();
          return;
        }
        sourceDbHandlers.put(id, dbHandler);
      }
    }
    // Publish the messages to the source handler
    dbHandler.add(update);
    List<Short> expectedServers = null;
    if (assuredMessage)
    {
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1548,9 +1548,11 @@
   *
   * @return                    A boolean indicating if the changes
   *                            requires to restart the service.
   * @param groupId            The new group id to use
   */
  public boolean changeConfig(
      Collection<String> replicationServers, int window, long heartbeatInterval)
      Collection<String> replicationServers, int window, long heartbeatInterval,
      byte groupId)
  {
    // These parameters needs to be renegociated with the ReplicationServer
    // so if they have changed, that requires restarting the session with
@@ -1563,7 +1565,8 @@
        (!(replicationServers.size() == servers.size()
        && replicationServers.containsAll(servers))) ||
        window != this.maxRcvWindow  ||
        heartbeatInterval != this.heartbeatInterval)
        heartbeatInterval != this.heartbeatInterval ||
        (groupId != this.groupId))
    {
      needToRestartSession = true;
    }
@@ -1573,6 +1576,7 @@
    this.maxRcvWindow = window;
    this.halfRcvWindow = window / 2;
    this.heartbeatInterval = heartbeatInterval;
    this.groupId = groupId;
    return needToRestartSession;
  }
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -215,7 +215,7 @@
  // Safe Data level (used when assuredMode is SAFE_DATA)
  private byte assuredSdLevel = (byte)1;
  // The timeout in ms that should be used, when waiting for assured acks
  private long assuredTimeout = 1000;
  private long assuredTimeout = 2000;
  // Group id
  private byte groupId = (byte)1;
@@ -257,6 +257,14 @@
  // format: <server id>:<number of failed updates>
  private Map<Short,Integer> assuredSrServerNotAcknowledgedUpdates =
    new HashMap<Short,Integer>();
  // Number of updates received in Assured Mode, Safe Read request
  private AtomicInteger receivedAssuredSrUpdates = new AtomicInteger(0);
  // Number of updates received in Assured Mode, Safe Read request that we have
  // acked without errors
  private AtomicInteger receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
  // Number of updates received in Assured Mode, Safe Read request that we have
  // acked with errors
  private AtomicInteger receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Data
  private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Data, that have been
@@ -765,89 +773,15 @@
    }
    numRcvdUpdates.incrementAndGet();
    if (update.isAssured() && (update.getAssuredMode() ==
      AssuredMode.SAFE_READ_MODE))
    {
      receivedAssuredSrUpdates.incrementAndGet();
    }
    return update;
  }
  /**
   * Wait for the processing of an assured message.
   *
   * @param msg The UpdateMsg for which we are waiting for an ack.
   * @throws TimeoutException When the configured timeout occurs waiting for the
   * ack.
   */
  private void waitForAck(UpdateMsg msg) throws TimeoutException
  {
    // Wait for the ack to be received, timing out if necessary
    long startTime = System.currentTimeMillis();
    synchronized (msg)
    {
      ChangeNumber cn = msg.getChangeNumber();
      while (waitingAckMsgs.containsKey(cn))
      {
        try
        {
          // WARNING: this timeout may be difficult to optimize: too low, it
          // may use too much CPU, too high, it may penalize performance...
          msg.wait(10);
        } catch (InterruptedException e)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("waitForAck method interrupted for replication " +
              "serviceID: " + serviceID);
          }
          break;
        }
        // Timeout ?
        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
        {
          // Timeout occured, be sure that ack is not being received and if so,
          // remove the update from the wait list, log the timeout error and
          // also update assured monitoring counters
          UpdateMsg update;
          synchronized (waitingAckMsgs)
          {
            update = waitingAckMsgs.remove(cn);
          }
          if (update != null)
          {
            // No luck, this is a real timeout
            // Increment assured replication monitoring counters
            switch (msg.getAssuredMode())
            {
              case SAFE_READ_MODE:
                assuredSrNotAcknowledgedUpdates.incrementAndGet();
                assuredSrTimeoutUpdates.incrementAndGet();
                // Increment number of errors for our RS
                updateAssuredErrorsByServer(
                  assuredSrServerNotAcknowledgedUpdates,
                  broker.getRsServerId());
                break;
              case SAFE_DATA_MODE:
                assuredSdTimeoutUpdates.incrementAndGet();
                // Increment number of errors for our RS
                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
                  broker.getRsServerId());
                break;
              default:
              // Should not happen
            }
            throw new TimeoutException("No ack received for message cn: " + cn +
              " and replication servceID: " + serviceID + " after " +
              assuredTimeout + " ms.");
          } else
          {
            // Ack received just before timeout limit: we can exit
            break;
          }
        }
      }
    }
  }
  /**
   * Updates the passed monitoring list of errors received for assured messages
   * (safe data or safe read, depending of the passed list to update) for a
   * particular server in the list. This increments the counter of error for the
@@ -2002,7 +1936,50 @@
   */
  public Map<Short, Integer> getAssuredSrServerNotAcknowledgedUpdates()
  {
    return assuredSrServerNotAcknowledgedUpdates;
    // Clone a snapshot with synchronized section to have a consistent view in
    // monitoring
    Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
    synchronized(assuredSrServerNotAcknowledgedUpdates)
    {
      Set<Short> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
      for (Short serverId : keySet)
      {
        Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
        snapshot.put(serverId, i);
      }
    }
    return snapshot;
  }
  /**
   * Gets the number of updates received in assured safe read mode request.
   * @return The number of updates received in assured safe read mode request.
   */
  public int getReceivedAssuredSrUpdates()
  {
    return receivedAssuredSrUpdates.get();
  }
  /**
   * Gets the number of updates received in assured safe read mode that we acked
   * without error (no replay error).
   * @return The number of updates received in assured safe read mode that we
   * acked without error (no replay error).
   */
  public int getReceivedAssuredSrUpdatesAcked()
  {
    return this.receivedAssuredSrUpdatesAcked.get();
  }
  /**
   * Gets the number of updates received in assured safe read mode that we did
   * not ack due to error (replay error).
   * @return The number of updates received in assured safe read mode that we
   * did not ack due to error (replay error).
   */
  public int getReceivedAssuredSrUpdatesNotAcked()
  {
    return this.receivedAssuredSrUpdatesNotAcked.get();
  }
  /**
@@ -2044,7 +2021,19 @@
   */
  public Map<Short, Integer> getAssuredSdServerTimeoutUpdates()
  {
    return assuredSdServerTimeoutUpdates;
    // Clone a snapshot with synchronized section to have a consistent view in
    // monitoring
    Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
    synchronized(assuredSdServerTimeoutUpdates)
    {
      Set<Short> keySet = assuredSdServerTimeoutUpdates.keySet();
      for (Short serverId : keySet)
      {
        Integer i = assuredSdServerTimeoutUpdates.get(serverId);
        snapshot.put(serverId, i);
      }
    }
    return snapshot;
  }
  /**
@@ -2068,6 +2057,9 @@
    assuredSrWrongStatusUpdates = new AtomicInteger(0);
    assuredSrReplayErrorUpdates = new AtomicInteger(0);
    assuredSrServerNotAcknowledgedUpdates = new HashMap<Short,Integer>();
    receivedAssuredSrUpdates = new AtomicInteger(0);
    receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
    receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
    assuredSdSentUpdates = new AtomicInteger(0);
    assuredSdAcknowledgedUpdates = new AtomicInteger(0);
    assuredSdTimeoutUpdates = new AtomicInteger(0);
@@ -2199,16 +2191,20 @@
   * @param windowSize         The window size that this domain should use.
   * @param heartbeatInterval  The heartbeatInterval that this domain should
   *                           use.
   * @param groupId            The new group id to use
   */
  public void changeConfig(
      Collection<String> replicationServers,
      int windowSize,
      long heartbeatInterval)
      long heartbeatInterval,
      byte groupId)
  {
    this.groupId = groupId;
    if (broker != null)
    {
      if (broker.changeConfig(
          replicationServers, windowSize, heartbeatInterval))
          replicationServers, windowSize, heartbeatInterval, groupId))
      {
        disableService();
        enableService();
@@ -2319,6 +2315,13 @@
              ackMsg.setFailedServers(idList);
            }
            broker.publish(ackMsg);
            if (replayErrorMsg != null)
            {
              receivedAssuredSrUpdatesNotAcked.incrementAndGet();
            } else
            {
              receivedAssuredSrUpdatesAcked.incrementAndGet();
            }
          }
        } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
        {
@@ -2338,27 +2341,38 @@
  }
  /**
   * Publish an {@link UpdateMsg} to the Replication Service.
   * <p>
   * The Replication Service will handle the delivery of this {@link UpdateMsg}
   * to all the participants of this Replication Domain.
   * These members will be receive this {@link UpdateMsg} through a call
   * of the {@link #processUpdate(UpdateMsg)} message.
   * Prepare a message if it is to be sent in assured mode.
   * If the assured mode is enabled, this method should be called before
   * publish(UpdateMsg msg) method. This will configure the update accordingly
   * before it is sent and will prepare the mechanism that will block until the
   * matching ack is received. To wait for the ack after publish call, use
   * the waitForAckIfAssuredEnabled() method.
   * The expected typical usage in a service inheriting from this class is
   * the following sequence:
   * UpdateMsg msg = xxx;
   * prepareWaitForAckIfAssuredEnabled(msg);
   * publish(msg);
   * waitForAckIfAssuredEnabled(msg);
   *
   * @param msg The UpdateMsg that should be pushed.
   * @throws TimeoutException When assured replication is enabled and the
   * configured timeout occurs when blocked waiting for the ack.
   * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have
   * no effect if assured replication is disabled.
   * Note: this mechanism should not be used if using publish(byte[] msg)
   * version as usage of these methods is already hidden inside.
   *
   * @param msg The update message to be sent soon.
   */
  public void publish(UpdateMsg msg) throws TimeoutException
  protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
  {
    byte rsGroupId = broker.getRsGroupId();
    // If assured configured, set message accordingly to request an ack in the
    // right assured mode.
    // No ack requested for a RS with a different group id. Assured replication
    // suported for the same locality, i.e: a topology working in the same
    // geographical location). If we are connected to a RS which is not in our
    // locality, no need to ask for an ack.
    /*
     * If assured configured, set message accordingly to request an ack in the
     * right assured mode.
     * No ack requested for a RS with a different group id. Assured
     * replication suported for the same locality, i.e: a topology working in
     * the same
     * geographical location). If we are connected to a RS which is not in our
     * locality, no need to ask for an ack.
     */
    if ( assured && ( rsGroupId == groupId ) )
    {
      msg.setAssured(true);
@@ -2373,11 +2387,22 @@
        waitingAckMsgs.put(msg.getChangeNumber(), msg);
      }
    }
  }
    // Publish the update
    broker.publish(msg);
    state.update(msg.getChangeNumber());
    numSentUpdates.incrementAndGet();
  /**
   * Wait for the processing of an assured message after it has been sent, if
   * assured replication is configured, otherwise, do nothing.
   * The prepareWaitForAckIfAssuredEnabled method should have been called
   * before, see its comment for the full picture.
   *
   * @param msg The UpdateMsg for which we are waiting for an ack.
   * @throws TimeoutException When the configured timeout occurs waiting for the
   * ack.
   */
  protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
    throws TimeoutException
  {
    byte rsGroupId = broker.getRsGroupId();
    // If assured mode configured, wait for acknowledgement for the just sent
    // message
@@ -2395,10 +2420,98 @@
        default:
          // Should not happen
      }
      // Now wait for ack matching the sent assured update
      waitForAck(msg);
    } else
    {
      // Not assured or bad group id, return immediately
      return;
    }
    // Wait for the ack to be received, timing out if necessary
    long startTime = System.currentTimeMillis();
    synchronized (msg)
    {
      ChangeNumber cn = msg.getChangeNumber();
      while (waitingAckMsgs.containsKey(cn))
      {
        try
        {
          // WARNING: this timeout may be difficult to optimize: too low, it
          // may use too much CPU, too high, it may penalize performance...
          msg.wait(10);
        } catch (InterruptedException e)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("waitForAck method interrupted for replication " +
              "serviceID: " + serviceID);
          }
          break;
        }
        // Timeout ?
        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
        {
          // Timeout occured, be sure that ack is not being received and if so,
          // remove the update from the wait list, log the timeout error and
          // also update assured monitoring counters
          UpdateMsg update;
          synchronized (waitingAckMsgs)
          {
            update = waitingAckMsgs.remove(cn);
          }
          if (update != null)
          {
            // No luck, this is a real timeout
            // Increment assured replication monitoring counters
            switch (msg.getAssuredMode())
            {
              case SAFE_READ_MODE:
                assuredSrNotAcknowledgedUpdates.incrementAndGet();
                assuredSrTimeoutUpdates.incrementAndGet();
                // Increment number of errors for our RS
                updateAssuredErrorsByServer(
                  assuredSrServerNotAcknowledgedUpdates,
                  broker.getRsServerId());
                break;
              case SAFE_DATA_MODE:
                assuredSdTimeoutUpdates.incrementAndGet();
                // Increment number of errors for our RS
                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
                  broker.getRsServerId());
                break;
              default:
              // Should not happen
            }
            throw new TimeoutException("No ack received for message cn: " + cn +
              " and replication servceID: " + serviceID + " after " +
              assuredTimeout + " ms.");
          } else
          {
            // Ack received just before timeout limit: we can exit
            break;
          }
        }
      }
    }
  }
  /**
   * Publish an {@link UpdateMsg} to the Replication Service.
   * <p>
   * The Replication Service will handle the delivery of this {@link UpdateMsg}
   * to all the participants of this Replication Domain.
   * These members will be receive this {@link UpdateMsg} through a call
   * of the {@link #processUpdate(UpdateMsg)} message.
   *
   * @param msg The UpdateMsg that should be pushed.
   */
  public void publish(UpdateMsg msg)
  {
    // Publish the update
    broker.publish(msg);
    state.update(msg.getChangeNumber());
    numSentUpdates.incrementAndGet();
  }
  /**
@@ -2410,12 +2523,27 @@
  public void publish(byte[] msg)
  {
    UpdateMsg update = new UpdateMsg(generator.newChangeNumber(), msg);
    // If assured replication is configured, this will prepare blocking
    // mechanism. If assured replication is disabled, this returns
    // immediately
    prepareWaitForAckIfAssuredEnabled(update);
    publish(update);
    try
    {
      publish(update);
    } catch (TimeoutException e)
      // If assured replication is enabled, this will wait for the matching
      // ack or time out. If assured replication is disabled, this returns
      // immediately
      waitForAckIfAssuredEnabled(update);
    } catch (TimeoutException ex)
    {
      // Should never happen as assured mode not requested
      // This exception may only be raised if assured replication is
      // enabled
      Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
        assuredTimeout), msg.toString());
      logError(errorMsg);
    }
  }
opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -210,6 +210,15 @@
      attributes.add(builder.toAttribute());
    }
    addMonitorData(attributes, "received-assured-sr-updates",
      domain.getReceivedAssuredSrUpdates());
    addMonitorData(attributes, "received-assured-sr-updates-acked",
      domain.getReceivedAssuredSrUpdatesAcked());
    addMonitorData(attributes, "received-assured-sr-updates-not-acked",
      domain.getReceivedAssuredSrUpdatesNotAcked());
    addMonitorData(attributes, "assured-sd-sent-updates",
      domain.getAssuredSdSentUpdates());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -622,7 +622,9 @@
        UUID.randomUUID().toString());
      // Send it (this uses the defined assured conf at constructor time)
      prepareWaitForAckIfAssuredEnabled(delMsg);
      publish(delMsg);
      waitForAckIfAssuredEnabled(delMsg);
    }
  }