mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.19.2014 faa556c351fae0bf73a939962426876944dddd25
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -83,7 +83,6 @@
  @Override
  public void run()
  {
    Message errMessage = null;
    if (debugEnabled())
    {
      TRACER.debugInfo(getName() + " starting");
@@ -93,91 +92,34 @@
     * grab all incoming messages and publish them to the
     * replicationServerDomain
     */
    Message errMessage = null;
    try
    {
      while (true)
      {
        try
        {
          ReplicationMsg msg = session.receive();
          final ReplicationMsg msg = session.receive();
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + getName() + " receives " + msg);
          }
          if (msg instanceof AckMsg)
          {
            handler.checkWindow();
            handler.processAck((AckMsg) msg);
          } else if (msg instanceof UpdateMsg)
          }
          else if (msg instanceof UpdateMsg)
          {
            UpdateMsg updateMsg = (UpdateMsg) msg;
            boolean filtered = false;
            // Ignore updates in some cases
            if (handler.isDataServer())
            {
              /**
               * Ignore updates from DS in bad BAD_GENID_STATUS or
               * FULL_UPDATE_STATUS
               *
               * The RSD lock should not be taken here as it is acceptable to
               * have a delay between the time the server has a wrong status and
               * the fact we detect it: the updates that succeed to pass during
               * this time will have no impact on remote server. But it is
               * interesting to not saturate uselessly the network if the
               * updates are not necessary so this check to stop sending updates
               * is interesting anyway. Not taking the RSD lock allows to have
               * better performances in normal mode (most of the time).
               */
              ServerStatus dsStatus = handler.getStatus();
              if (dsStatus == BAD_GEN_ID_STATUS
                  || dsStatus == FULL_UPDATE_STATUS)
              {
                long referenceGenerationId = handler.getReferenceGenId();
                if (dsStatus == BAD_GEN_ID_STATUS)
                  logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get(
                      handler.getReplicationServerId(),
                      updateMsg.getCSN().toString(),
                      handler.getBaseDNString(), handler.getServerId(),
                      session.getReadableRemoteAddress(),
                      handler.getGenerationId(),
                      referenceGenerationId));
                if (dsStatus == FULL_UPDATE_STATUS)
                  logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                      handler.getReplicationServerId(),
                      updateMsg.getCSN().toString(),
                      handler.getBaseDNString(), handler.getServerId(),
                      session.getReadableRemoteAddress()));
                filtered = true;
              }
            } else
            {
              /**
               * Ignore updates from RS with bad gen id
               * (no system managed status for a RS)
               */
              long referenceGenerationId = handler.getReferenceGenId();
              if (referenceGenerationId > 0
                  && referenceGenerationId != handler.getGenerationId())
              {
                logError(
                    WARN_IGNORING_UPDATE_FROM_RS.get(
                        handler.getReplicationServerId(),
                        updateMsg.getCSN().toString(),
                        handler.getBaseDNString(),
                        handler.getServerId(),
                        session.getReadableRemoteAddress(),
                        handler.getGenerationId(),
                        referenceGenerationId));
                filtered = true;
              }
            }
            if (!filtered)
            final UpdateMsg updateMsg = (UpdateMsg) msg;
            if (!isUpdateMsgFiltered(updateMsg))
            {
              handler.put(updateMsg);
            }
          } else if (msg instanceof WindowMsg)
          }
          else if (msg instanceof WindowMsg)
          {
            handler.updateWindow((WindowMsg) msg);
          }
@@ -301,6 +243,76 @@
    }
  }
  /**
   * Returns whether the update message is filtered in one of those cases:
   * <ul>
   * <li>Ignore updates from DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS</li>
   * <li>Ignore updates from RS with bad gen id</li>
   * </ul>
   */
  private boolean isUpdateMsgFiltered(UpdateMsg updateMsg)
  {
    if (handler.isDataServer())
    {
      /**
       * Ignore updates from DS in bad BAD_GENID_STATUS or
       * FULL_UPDATE_STATUS
       *
       * The RSD lock should not be taken here as it is acceptable to
       * have a delay between the time the server has a wrong status and
       * the fact we detect it: the updates that succeed to pass during
       * this time will have no impact on remote server. But it is
       * interesting to not saturate uselessly the network if the
       * updates are not necessary so this check to stop sending updates
       * is interesting anyway. Not taking the RSD lock allows to have
       * better performances in normal mode (most of the time).
       */
      final ServerStatus dsStatus = handler.getStatus();
      if (dsStatus == BAD_GEN_ID_STATUS)
      {
        logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get(
            handler.getReplicationServerId(),
            updateMsg.getCSN().toString(),
            handler.getBaseDNString(), handler.getServerId(),
            session.getReadableRemoteAddress(),
            handler.getGenerationId(),
            handler.getReferenceGenId()));
        return true;
      }
      else if (dsStatus == FULL_UPDATE_STATUS)
      {
        logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get(
            handler.getReplicationServerId(),
            updateMsg.getCSN().toString(),
            handler.getBaseDNString(), handler.getServerId(),
            session.getReadableRemoteAddress()));
        return true;
      }
    }
    else
    {
      /**
       * Ignore updates from RS with bad gen id
       * (no system managed status for a RS)
       */
      long referenceGenerationId = handler.getReferenceGenId();
      if (referenceGenerationId > 0
          && referenceGenerationId != handler.getGenerationId())
      {
        logError(WARN_IGNORING_UPDATE_FROM_RS.get(
            handler.getReplicationServerId(),
            updateMsg.getCSN().toString(),
            handler.getBaseDNString(),
            handler.getServerId(),
            session.getReadableRemoteAddress(),
            handler.getGenerationId(),
            referenceGenerationId));
        return true;
      }
    }
    return false;
  }
  private void logException(Exception e)
  {
    if (debugEnabled())