mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.43.2014 0642bd56015b5571ada7bb38b77844c08b574d4c
opendj3-server-dev/src/server/org/opends/server/replication/server/ServerReader.java
@@ -77,7 +77,6 @@
  @Override
  public void run()
  {
    LocalizableMessage errMessage = null;
    if (logger.isTraceEnabled())
    {
      logger.trace(getName() + " starting");
@@ -87,16 +86,19 @@
     * grab all incoming messages and publish them to the
     * replicationServerDomain
     */
    LocalizableMessage errMessage = null;
    try
    {
      while (true)
      {
        try
        {
          ReplicationMsg msg = session.receive();
          final ReplicationMsg msg = session.receive();
          if (logger.isTraceEnabled())
          {
            logger.trace("In " + getName() + " receives " + msg);
          }
          if (msg instanceof AckMsg)
          {
@@ -105,67 +107,8 @@
          }
          else if (msg instanceof UpdateMsg)
          {
            UpdateMsg updateMsg = (UpdateMsg) msg;
            boolean filtered = false;
            // Ignore updates in some cases
            if (handler.isDataServer())
            {
              /*
               * Ignore updates from DS in bad BAD_GENID_STATUS or
               * FULL_UPDATE_STATUS The RSD lock should not be taken here as it
               * is acceptable to have a delay between the time the server has a
               * wrong status and the fact we detect it: the updates that
               * succeed to pass during this time will have no impact on remote
               * server. But it is interesting to not saturate uselessly the
               * network if the updates are not necessary so this check to stop
               * sending updates is interesting anyway. Not taking the RSD lock
               * allows to have better performances in normal mode (most of the
               * time).
               */
              ServerStatus dsStatus = handler.getStatus();
              if (dsStatus == BAD_GEN_ID_STATUS
                  || dsStatus == FULL_UPDATE_STATUS)
              {
                long referenceGenerationId = handler.getReferenceGenId();
                if (dsStatus == BAD_GEN_ID_STATUS)
                {
                  logger.warn(WARN_IGNORING_UPDATE_FROM_DS_BADGENID,
                      handler.getReplicationServerId(), updateMsg.getCSN(),
                      handler.getBaseDN(), handler.getServerId(),
                      session.getReadableRemoteAddress(),
                      handler.getGenerationId(), referenceGenerationId);
                }
                if (dsStatus == FULL_UPDATE_STATUS)
                {
                  logger.warn(WARN_IGNORING_UPDATE_FROM_DS_FULLUP,
                      handler.getReplicationServerId(), updateMsg.getCSN(),
                      handler.getBaseDN(), handler.getServerId(),
                      session.getReadableRemoteAddress());
                }
                filtered = true;
              }
            }
            else
            {
              /*
               * Ignore updates from RS with bad gen id (no system managed
               * status for a RS)
               */
              long referenceGenerationId = handler.getReferenceGenId();
              if (referenceGenerationId > 0
                  && referenceGenerationId != handler.getGenerationId())
              {
                logger.error(WARN_IGNORING_UPDATE_FROM_RS,
                    handler.getReplicationServerId(), updateMsg.getCSN(),
                    handler.getBaseDN(), handler.getServerId(),
                    session.getReadableRemoteAddress(),
                    handler.getGenerationId(), referenceGenerationId);
                filtered = true;
              }
            }
            if (!filtered)
            final UpdateMsg updateMsg = (UpdateMsg) msg;
            if (!isUpdateMsgFiltered(updateMsg))
            {
              handler.put(updateMsg);
            }
@@ -301,6 +244,70 @@
    }
  }
  /**
   * Returns whether the update message is filtered in one of those cases:
   * <ul>
   * <li>Ignore updates from DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS</li>
   * <li>Ignore updates from RS with bad gen id</li>
   * </ul>
   */
  private boolean isUpdateMsgFiltered(UpdateMsg updateMsg)
  {
    if (handler.isDataServer())
    {
      /**
       * Ignore updates from DS in bad BAD_GENID_STATUS or
       * FULL_UPDATE_STATUS
       *
       * The RSD lock should not be taken here as it is acceptable to
       * have a delay between the time the server has a wrong status and
       * the fact we detect it: the updates that succeed to pass during
       * this time will have no impact on remote server. But it is
       * interesting to not saturate uselessly the network if the
       * updates are not necessary so this check to stop sending updates
       * is interesting anyway. Not taking the RSD lock allows to have
       * better performances in normal mode (most of the time).
       */
      final ServerStatus dsStatus = handler.getStatus();
      if (dsStatus == BAD_GEN_ID_STATUS)
      {
        logger.warn(WARN_IGNORING_UPDATE_FROM_DS_BADGENID,
            handler.getReplicationServerId(), updateMsg.getCSN(),
            handler.getBaseDN(), handler.getServerId(),
            session.getReadableRemoteAddress(),
            handler.getGenerationId(), handler.getReferenceGenId());
        return true;
      }
      else if (dsStatus == FULL_UPDATE_STATUS)
      {
        logger.warn(WARN_IGNORING_UPDATE_FROM_DS_FULLUP,
            handler.getReplicationServerId(), updateMsg.getCSN(),
            handler.getBaseDN(), handler.getServerId(),
            session.getReadableRemoteAddress());
        return true;
      }
    }
    else
    {
      /**
       * Ignore updates from RS with bad gen id
       * (no system managed status for a RS)
       */
      long referenceGenerationId = handler.getReferenceGenId();
      if (referenceGenerationId > 0
          && referenceGenerationId != handler.getGenerationId())
      {
        logger.error(WARN_IGNORING_UPDATE_FROM_RS,
            handler.getReplicationServerId(), updateMsg.getCSN(),
            handler.getBaseDN(), handler.getServerId(),
            session.getReadableRemoteAddress(),
            handler.getGenerationId(), referenceGenerationId);
        return true;
      }
    }
    return false;
  }
  private void logException(Exception e)
  {
    if (logger.isTraceEnabled())