mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
18.13.2008 a719d21181a3b1c98c16bc677e892cf67fed4e7f
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -215,7 +215,7 @@
  // Safe Data level (used when assuredMode is SAFE_DATA)
  private byte assuredSdLevel = (byte)1;
  // The timeout in ms that should be used, when waiting for assured acks
  private long assuredTimeout = 1000;
  private long assuredTimeout = 2000;
  // Group id
  private byte groupId = (byte)1;
@@ -257,6 +257,14 @@
  // format: <server id>:<number of failed updates>
  private Map<Short,Integer> assuredSrServerNotAcknowledgedUpdates =
    new HashMap<Short,Integer>();
  // Number of updates received in Assured Mode, Safe Read request
  private AtomicInteger receivedAssuredSrUpdates = new AtomicInteger(0);
  // Number of updates received in Assured Mode, Safe Read request that we have
  // acked without errors
  private AtomicInteger receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
  // Number of updates received in Assured Mode, Safe Read request that we have
  // acked with errors
  private AtomicInteger receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Data
  private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
  // Number of updates sent in Assured Mode, Safe Data, that have been
@@ -765,86 +773,12 @@
    }
    numRcvdUpdates.incrementAndGet();
    return update;
  }
  /**
   * Wait for the processing of an assured message.
   *
   * @param msg The UpdateMsg for which we are waiting for an ack.
   * @throws TimeoutException When the configured timeout occurs waiting for the
   * ack.
   */
  private void waitForAck(UpdateMsg msg) throws TimeoutException
  {
    // Wait for the ack to be received, timing out if necessary
    long startTime = System.currentTimeMillis();
    synchronized (msg)
    if (update.isAssured() && (update.getAssuredMode() ==
      AssuredMode.SAFE_READ_MODE))
    {
      ChangeNumber cn = msg.getChangeNumber();
      while (waitingAckMsgs.containsKey(cn))
      {
        try
        {
          // WARNING: this timeout may be difficult to optimize: too low, it
          // may use too much CPU, too high, it may penalize performance...
          msg.wait(10);
        } catch (InterruptedException e)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("waitForAck method interrupted for replication " +
              "serviceID: " + serviceID);
          }
          break;
        }
        // Timeout ?
        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
        {
          // Timeout occured, be sure that ack is not being received and if so,
          // remove the update from the wait list, log the timeout error and
          // also update assured monitoring counters
          UpdateMsg update;
          synchronized (waitingAckMsgs)
          {
            update = waitingAckMsgs.remove(cn);
          }
          if (update != null)
          {
            // No luck, this is a real timeout
            // Increment assured replication monitoring counters
            switch (msg.getAssuredMode())
            {
              case SAFE_READ_MODE:
                assuredSrNotAcknowledgedUpdates.incrementAndGet();
                assuredSrTimeoutUpdates.incrementAndGet();
                // Increment number of errors for our RS
                updateAssuredErrorsByServer(
                  assuredSrServerNotAcknowledgedUpdates,
                  broker.getRsServerId());
                break;
              case SAFE_DATA_MODE:
                assuredSdTimeoutUpdates.incrementAndGet();
                // Increment number of errors for our RS
                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
                  broker.getRsServerId());
                break;
              default:
              // Should not happen
            }
            throw new TimeoutException("No ack received for message cn: " + cn +
              " and replication servceID: " + serviceID + " after " +
              assuredTimeout + " ms.");
          } else
          {
            // Ack received just before timeout limit: we can exit
            break;
          }
        }
      }
      receivedAssuredSrUpdates.incrementAndGet();
    }
    return update;
  }
  /**
@@ -2002,7 +1936,50 @@
   */
  public Map<Short, Integer> getAssuredSrServerNotAcknowledgedUpdates()
  {
    return assuredSrServerNotAcknowledgedUpdates;
    // Clone a snapshot with synchronized section to have a consistent view in
    // monitoring
    Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
    synchronized(assuredSrServerNotAcknowledgedUpdates)
    {
      Set<Short> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
      for (Short serverId : keySet)
      {
        Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
        snapshot.put(serverId, i);
      }
    }
    return snapshot;
  }
  /**
   * Gets the number of updates received in assured safe read mode request.
   * @return The number of updates received in assured safe read mode request.
   */
  public int getReceivedAssuredSrUpdates()
  {
    return receivedAssuredSrUpdates.get();
  }
  /**
   * Gets the number of updates received in assured safe read mode that we acked
   * without error (no replay error).
   * @return The number of updates received in assured safe read mode that we
   * acked without error (no replay error).
   */
  public int getReceivedAssuredSrUpdatesAcked()
  {
    return this.receivedAssuredSrUpdatesAcked.get();
  }
  /**
   * Gets the number of updates received in assured safe read mode that we did
   * not ack due to error (replay error).
   * @return The number of updates received in assured safe read mode that we
   * did not ack due to error (replay error).
   */
  public int getReceivedAssuredSrUpdatesNotAcked()
  {
    return this.receivedAssuredSrUpdatesNotAcked.get();
  }
  /**
@@ -2044,7 +2021,19 @@
   */
  public Map<Short, Integer> getAssuredSdServerTimeoutUpdates()
  {
    return assuredSdServerTimeoutUpdates;
    // Clone a snapshot with synchronized section to have a consistent view in
    // monitoring
    Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
    synchronized(assuredSdServerTimeoutUpdates)
    {
      Set<Short> keySet = assuredSdServerTimeoutUpdates.keySet();
      for (Short serverId : keySet)
      {
        Integer i = assuredSdServerTimeoutUpdates.get(serverId);
        snapshot.put(serverId, i);
      }
    }
    return snapshot;
  }
  /**
@@ -2068,6 +2057,9 @@
    assuredSrWrongStatusUpdates = new AtomicInteger(0);
    assuredSrReplayErrorUpdates = new AtomicInteger(0);
    assuredSrServerNotAcknowledgedUpdates = new HashMap<Short,Integer>();
    receivedAssuredSrUpdates = new AtomicInteger(0);
    receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
    receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
    assuredSdSentUpdates = new AtomicInteger(0);
    assuredSdAcknowledgedUpdates = new AtomicInteger(0);
    assuredSdTimeoutUpdates = new AtomicInteger(0);
@@ -2199,16 +2191,20 @@
   * @param windowSize         The window size that this domain should use.
   * @param heartbeatInterval  The heartbeatInterval that this domain should
   *                           use.
   * @param groupId            The new group id to use
   */
  public void changeConfig(
      Collection<String> replicationServers,
      int windowSize,
      long heartbeatInterval)
      long heartbeatInterval,
      byte groupId)
  {
    this.groupId = groupId;
    if (broker != null)
    {
      if (broker.changeConfig(
          replicationServers, windowSize, heartbeatInterval))
          replicationServers, windowSize, heartbeatInterval, groupId))
      {
        disableService();
        enableService();
@@ -2319,6 +2315,13 @@
              ackMsg.setFailedServers(idList);
            }
            broker.publish(ackMsg);
            if (replayErrorMsg != null)
            {
              receivedAssuredSrUpdatesNotAcked.incrementAndGet();
            } else
            {
              receivedAssuredSrUpdatesAcked.incrementAndGet();
            }
          }
        } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
        {
@@ -2338,28 +2341,39 @@
  }
  /**
   * Publish an {@link UpdateMsg} to the Replication Service.
   * <p>
   * The Replication Service will handle the delivery of this {@link UpdateMsg}
   * to all the participants of this Replication Domain.
   * These members will be receive this {@link UpdateMsg} through a call
   * of the {@link #processUpdate(UpdateMsg)} message.
   * Prepare a message if it is to be sent in assured mode.
   * If the assured mode is enabled, this method should be called before
   * publish(UpdateMsg msg) method. This will configure the update accordingly
   * before it is sent and will prepare the mechanism that will block until the
   * matching ack is received. To wait for the ack after publish call, use
   * the waitForAckIfAssuredEnabled() method.
   * The expected typical usage in a service inheriting from this class is
   * the following sequence:
   * UpdateMsg msg = xxx;
   * prepareWaitForAckIfAssuredEnabled(msg);
   * publish(msg);
   * waitForAckIfAssuredEnabled(msg);
   *
   * @param msg The UpdateMsg that should be pushed.
   * @throws TimeoutException When assured replication is enabled and the
   * configured timeout occurs when blocked waiting for the ack.
   * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have
   * no effect if assured replication is disabled.
   * Note: this mechanism should not be used if using publish(byte[] msg)
   * version as usage of these methods is already hidden inside.
   *
   * @param msg The update message to be sent soon.
   */
  public void publish(UpdateMsg msg) throws TimeoutException
  protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
  {
    byte rsGroupId = broker.getRsGroupId();
    // If assured configured, set message accordingly to request an ack in the
    // right assured mode.
    // No ack requested for a RS with a different group id. Assured replication
    // suported for the same locality, i.e: a topology working in the same
    // geographical location). If we are connected to a RS which is not in our
    // locality, no need to ask for an ack.
    if ( assured && ( rsGroupId == groupId ) )
    /*
     * If assured configured, set message accordingly to request an ack in the
     * right assured mode.
     * No ack requested for a RS with a different group id. Assured
     * replication suported for the same locality, i.e: a topology working in
     * the same
     * geographical location). If we are connected to a RS which is not in our
     * locality, no need to ask for an ack.
     */
    if (assured && (rsGroupId == groupId))
    {
      msg.setAssured(true);
      msg.setAssuredMode(assuredMode);
@@ -2373,18 +2387,29 @@
        waitingAckMsgs.put(msg.getChangeNumber(), msg);
      }
    }
  }
    // Publish the update
    broker.publish(msg);
    state.update(msg.getChangeNumber());
    numSentUpdates.incrementAndGet();
  /**
   * Wait for the processing of an assured message after it has been sent, if
   * assured replication is configured, otherwise, do nothing.
   * The prepareWaitForAckIfAssuredEnabled method should have been called
   * before, see its comment for the full picture.
   *
   * @param msg The UpdateMsg for which we are waiting for an ack.
   * @throws TimeoutException When the configured timeout occurs waiting for the
   * ack.
   */
  protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
    throws TimeoutException
  {
    byte rsGroupId = broker.getRsGroupId();
    // If assured mode configured, wait for acknowledgement for the just sent
    // message
    if ( assured && ( rsGroupId == groupId ) )
    if (assured && (rsGroupId == groupId))
    {
      // Increment assured replication monitoring counters
      switch(assuredMode)
      switch (assuredMode)
      {
        case SAFE_READ_MODE:
          assuredSrSentUpdates.incrementAndGet();
@@ -2393,12 +2418,100 @@
          assuredSdSentUpdates.incrementAndGet();
          break;
        default:
          // Should not happen
        // Should not happen
      }
      // Now wait for ack matching the sent assured update
      waitForAck(msg);
    } else
    {
      // Not assured or bad group id, return immediately
      return;
    }
    // Wait for the ack to be received, timing out if necessary
    long startTime = System.currentTimeMillis();
    synchronized (msg)
    {
      ChangeNumber cn = msg.getChangeNumber();
      while (waitingAckMsgs.containsKey(cn))
      {
        try
        {
          // WARNING: this timeout may be difficult to optimize: too low, it
          // may use too much CPU, too high, it may penalize performance...
          msg.wait(10);
        } catch (InterruptedException e)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("waitForAck method interrupted for replication " +
              "serviceID: " + serviceID);
          }
          break;
        }
        // Timeout ?
        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
        {
          // Timeout occured, be sure that ack is not being received and if so,
          // remove the update from the wait list, log the timeout error and
          // also update assured monitoring counters
          UpdateMsg update;
          synchronized (waitingAckMsgs)
          {
            update = waitingAckMsgs.remove(cn);
          }
          if (update != null)
          {
            // No luck, this is a real timeout
            // Increment assured replication monitoring counters
            switch (msg.getAssuredMode())
            {
              case SAFE_READ_MODE:
                assuredSrNotAcknowledgedUpdates.incrementAndGet();
                assuredSrTimeoutUpdates.incrementAndGet();
                // Increment number of errors for our RS
                updateAssuredErrorsByServer(
                  assuredSrServerNotAcknowledgedUpdates,
                  broker.getRsServerId());
                break;
              case SAFE_DATA_MODE:
                assuredSdTimeoutUpdates.incrementAndGet();
                // Increment number of errors for our RS
                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
                  broker.getRsServerId());
                break;
              default:
              // Should not happen
            }
            throw new TimeoutException("No ack received for message cn: " + cn +
              " and replication servceID: " + serviceID + " after " +
              assuredTimeout + " ms.");
          } else
          {
            // Ack received just before timeout limit: we can exit
            break;
          }
        }
      }
    }
  }
  /**
   * Publish an {@link UpdateMsg} to the Replication Service.
   * <p>
   * The Replication Service will handle the delivery of this {@link UpdateMsg}
   * to all the participants of this Replication Domain.
   * These members will be receive this {@link UpdateMsg} through a call
   * of the {@link #processUpdate(UpdateMsg)} message.
   *
   * @param msg The UpdateMsg that should be pushed.
   */
  public void publish(UpdateMsg msg)
  {
    // Publish the update
    broker.publish(msg);
    state.update(msg.getChangeNumber());
    numSentUpdates.incrementAndGet();
  }
  /**
@@ -2410,12 +2523,27 @@
  public void publish(byte[] msg)
  {
    UpdateMsg update = new UpdateMsg(generator.newChangeNumber(), msg);
    // If assured replication is configured, this will prepare blocking
    // mechanism. If assured replication is disabled, this returns
    // immediately
    prepareWaitForAckIfAssuredEnabled(update);
    publish(update);
    try
    {
      publish(update);
    } catch (TimeoutException e)
      // If assured replication is enabled, this will wait for the matching
      // ack or time out. If assured replication is disabled, this returns
      // immediately
      waitForAckIfAssuredEnabled(update);
    } catch (TimeoutException ex)
    {
      // Should never happen as assured mode not requested
      // This exception may only be raised if assured replication is
      // enabled
      Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
        assuredTimeout), msg.toString());
      logError(errorMsg);
    }
  }