mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.43.2014 0642bd56015b5571ada7bb38b77844c08b574d4c
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -27,7 +27,6 @@
package org.opends.server.replication.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -287,16 +286,15 @@
   * Add an update that has been received to the list of
   * updates that must be forwarded to all other servers.
   *
   * @param update  The update that has been received.
   * @param updateMsg  The update that has been received.
   * @param sourceHandler The ServerHandler for the server from which the
   *        update was received
   * @throws IOException When an IO exception happens during the update
   *         processing.
   */
  public void put(UpdateMsg update, ServerHandler sourceHandler)
    throws IOException
  public void put(UpdateMsg updateMsg, ServerHandler sourceHandler) throws IOException
  {
    sourceHandler.updateServerState(update);
    sourceHandler.updateServerState(updateMsg);
    sourceHandler.incrementInCount();
    setGenerationIdIfUnset(sourceHandler.getGenerationId());
@@ -318,78 +316,14 @@
     * data centers, but one will request and wait acks only from servers of the
     * data center 1.
     */
    boolean assuredMessage = update.isAssured();
    PreparedAssuredInfo preparedAssuredInfo = null;
    if (assuredMessage)
    {
      // Assured feature is supported starting from replication protocol V2
      if (sourceHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V2)
      {
        // According to assured sub-mode, prepare structures to keep track of
        // the acks we are interested in.
        AssuredMode assuredMode = update.getAssuredMode();
        if (assuredMode == AssuredMode.SAFE_DATA_MODE)
        {
          sourceHandler.incrementAssuredSdReceivedUpdates();
          preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
        } else if (assuredMode == AssuredMode.SAFE_READ_MODE)
        {
          sourceHandler.incrementAssuredSrReceivedUpdates();
          preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
        } else
        {
          // Unknown assured mode: should never happen
          logger.error(ERR_RS_UNKNOWN_ASSURED_MODE,
              localReplicationServer.getServerId(), assuredMode, baseDN, update);
          assuredMessage = false;
        }
      } else
      {
        assuredMessage = false;
      }
    }
    final PreparedAssuredInfo preparedAssuredInfo = getPreparedAssuredInfo(updateMsg, sourceHandler);
    if (!publishUpdateMsg(update))
    if (!publishUpdateMsg(updateMsg))
    {
      return;
    }
    List<Integer> expectedServers = null;
    if (assuredMessage)
    {
      expectedServers = preparedAssuredInfo.expectedServers;
      if (expectedServers != null)
      {
        // Store the expected acks info into the global map.
        // The code for processing reception of acks for this update will update
        // info kept in this object and if enough acks received, it will send
        // back the final ack to the requester and remove the object from this
        // map
        // OR
        // The following timer will time out and send an timeout ack to the
        // requester if the acks are not received in time. The timer will also
        // remove the object from this map.
        CSN csn = update.getCSN();
        waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
        // Arm timer for this assured update message (wait for acks until it
        // times out)
        AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
        assuredTimeoutTimer.schedule(assuredTimeoutTask,
            localReplicationServer.getAssuredTimeout());
        // Purge timer every 100 treated messages
        assuredTimeoutTimerPurgeCounter++;
        if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
        {
          assuredTimeoutTimer.purge();
        }
      }
    }
    if (expectedServers == null)
    {
      expectedServers = Collections.emptyList();
    }
    final List<Integer> assuredServers = getAssuredServers(updateMsg, preparedAssuredInfo);
    /**
     * The update message equivalent to the originally received update message,
@@ -403,7 +337,8 @@
     * references posted on every writer queues. That is why we need a message
     * version with assured flag on and another one with assured flag off.
     */
    NotAssuredUpdateMsg notAssuredUpdate = null;
    final NotAssuredUpdateMsg notAssuredUpdateMsg =
        preparedAssuredInfo != null ? new NotAssuredUpdateMsg(updateMsg) : null;
    // Push the message to the replication servers
    if (sourceHandler.isDataServer())
@@ -414,80 +349,145 @@
         * Ignore updates to RS with bad gen id
         * (no system managed status for a RS)
         */
        if (isDifferentGenerationId(rsHandler.getGenerationId()))
        if (!isDifferentGenerationId(rsHandler, updateMsg))
        {
          if (logger.isTraceEnabled())
          {
            debug("update " + update.getCSN()
                + " will not be sent to replication server "
                + rsHandler.getServerId() + " with generation id "
                + rsHandler.getGenerationId() + " different from local "
                + "generation id " + generationId);
          }
          continue;
          addUpdate(rsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
        }
        notAssuredUpdate = addUpdate(rsHandler, update, notAssuredUpdate,
            assuredMessage, expectedServers);
      }
    }
    // Push the message to the LDAP servers
    for (DataServerHandler dsHandler : connectedDSs.values())
    {
      // Don't forward the change to the server that just sent it
      if (dsHandler == sourceHandler)
      // Do not forward the change to the server that just sent it
      if (dsHandler != sourceHandler
          && !isUpdateMsgFiltered(updateMsg, dsHandler))
      {
        continue;
        addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
      }
      /**
       * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS
       *
       * The RSD lock should not be taken here as it is acceptable to have a
       * delay between the time the server has a wrong status and the fact we
       * detect it: the updates that succeed to pass during this time will have
       * no impact on remote server. But it is interesting to not saturate
       * uselessly the network if the updates are not necessary so this check to
       * stop sending updates is interesting anyway. Not taking the RSD lock
       * allows to have better performances in normal mode (most of the time).
       */
      ServerStatus dsStatus = dsHandler.getStatus();
      if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS
          || dsStatus == ServerStatus.FULL_UPDATE_STATUS)
      {
        if (logger.isTraceEnabled())
        {
          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
          {
            debug("update " + update.getCSN()
                + " will not be sent to directory server "
                + dsHandler.getServerId() + " with generation id "
                + dsHandler.getGenerationId() + " different from local "
                + "generation id " + generationId);
          }
          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
          {
            debug("update " + update.getCSN()
                + " will not be sent to directory server "
                + dsHandler.getServerId() + " as it is in full update");
          }
        }
        continue;
      }
      notAssuredUpdate = addUpdate(dsHandler, update, notAssuredUpdate,
          assuredMessage, expectedServers);
    }
    // Push the message to the other subscribing handlers
    for (MessageHandler mHandler : otherHandlers) {
      mHandler.add(update);
      mHandler.add(updateMsg);
    }
  }
  private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
      UpdateMsg updateMsg)
  {
    final boolean isDifferent = isDifferentGenerationId(rsHandler.getGenerationId());
    if (isDifferent && logger.isTraceEnabled())
    {
      debug("updateMsg " + updateMsg.getCSN()
          + " will not be sent to replication server "
          + rsHandler.getServerId() + " with generation id "
          + rsHandler.getGenerationId() + " different from local "
          + "generation id " + generationId);
    }
    return isDifferent;
  }
  /**
   * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS.
   * <p>
   * The RSD lock should not be taken here as it is acceptable to have a delay
   * between the time the server has a wrong status and the fact we detect it:
   * the updates that succeed to pass during this time will have no impact on
   * remote server. But it is interesting to not saturate uselessly the network
   * if the updates are not necessary so this check to stop sending updates is
   * interesting anyway. Not taking the RSD lock allows to have better
   * performances in normal mode (most of the time).
   */
  private boolean isUpdateMsgFiltered(UpdateMsg updateMsg, DataServerHandler dsHandler)
  {
    final ServerStatus dsStatus = dsHandler.getStatus();
    if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
    {
      if (logger.isTraceEnabled())
      {
        debug("updateMsg " + updateMsg.getCSN()
            + " will not be sent to directory server "
            + dsHandler.getServerId() + " with generation id "
            + dsHandler.getGenerationId() + " different from local "
            + "generation id " + generationId);
      }
      return true;
    }
    else if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
    {
      if (logger.isTraceEnabled())
      {
        debug("updateMsg " + updateMsg.getCSN()
            + " will not be sent to directory server "
            + dsHandler.getServerId() + " as it is in full update");
      }
      return true;
    }
    return false;
  }
  private PreparedAssuredInfo getPreparedAssuredInfo(UpdateMsg updateMsg,
      ServerHandler sourceHandler) throws IOException
  {
    // Assured feature is supported starting from replication protocol V2
    if (!updateMsg.isAssured()
        || sourceHandler.getProtocolVersion() < REPLICATION_PROTOCOL_V2)
    {
      return null;
    }
    // According to assured sub-mode, prepare structures to keep track of
    // the acks we are interested in.
    switch (updateMsg.getAssuredMode())
    {
    case SAFE_DATA_MODE:
      sourceHandler.incrementAssuredSdReceivedUpdates();
      return processSafeDataUpdateMsg(updateMsg, sourceHandler);
    case SAFE_READ_MODE:
      sourceHandler.incrementAssuredSrReceivedUpdates();
      return processSafeReadUpdateMsg(updateMsg, sourceHandler);
    default:
      // Unknown assured mode: should never happen
      logger.error(ERR_RS_UNKNOWN_ASSURED_MODE,
          localReplicationServer.getServerId(), updateMsg.getAssuredMode(), baseDN, updateMsg);
      return null;
    }
  }
  private List<Integer> getAssuredServers(UpdateMsg updateMsg, PreparedAssuredInfo preparedAssuredInfo)
  {
    List<Integer> expectedServers = null;
    if (preparedAssuredInfo != null && preparedAssuredInfo.expectedServers != null)
    {
      expectedServers = preparedAssuredInfo.expectedServers;
      // Store the expected acks info into the global map.
      // The code for processing reception of acks for this update will update
      // info kept in this object and if enough acks received, it will send
      // back the final ack to the requester and remove the object from this map
      // OR
      // The following timer will time out and send an timeout ack to the
      // requester if the acks are not received in time. The timer will also
      // remove the object from this map.
      final CSN csn = updateMsg.getCSN();
      waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
      // Arm timer for this assured update message (wait for acks until it times out)
      final AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
      assuredTimeoutTimer.schedule(assuredTimeoutTask, localReplicationServer.getAssuredTimeout());
      // Purge timer every 100 treated messages
      assuredTimeoutTimerPurgeCounter++;
      if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
      {
        assuredTimeoutTimer.purge();
      }
    }
    return expectedServers != null ? expectedServers : Collections.<Integer> emptyList();
  }
  private boolean publishUpdateMsg(UpdateMsg updateMsg)
  {
    try
@@ -533,33 +533,20 @@
    }
  }
  private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
      UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate,
      boolean assuredMessage, List<Integer> expectedServers)
      throws UnsupportedEncodingException
  private void addUpdate(ServerHandler sHandler, UpdateMsg updateMsg,
      NotAssuredUpdateMsg notAssuredUpdateMsg, List<Integer> assuredServers)
  {
    if (assuredMessage)
    // Assured mode: post an assured or not assured matching update message
    // according to what has been computed for the destination server
    if (notAssuredUpdateMsg != null
        && !assuredServers.contains(sHandler.getServerId()))
    {
      // Assured mode: post an assured or not assured matching update
      // message according to what has been computed for the destination server
      if (expectedServers.contains(sHandler.getServerId()))
      {
        sHandler.add(update);
      }
      else
      {
        if (notAssuredUpdate == null)
        {
          notAssuredUpdate = new NotAssuredUpdateMsg(update);
        }
        sHandler.add(notAssuredUpdate);
      }
      sHandler.add(notAssuredUpdateMsg);
    }
    else
    {
      sHandler.add(update);
      sHandler.add(updateMsg);
    }
    return notAssuredUpdate;
  }
  /**