mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
30.30.2008 2246119784cdfb6f882eba79ed96d2dd9f56f8f9
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -123,50 +123,80 @@
    {
      while (true)
      {
        ReplicationMsg msg = session.receive();
        try
        {
          ReplicationMsg msg = session.receive();
        /*
        if (debugEnabled())
        {
        TRACER.debugInfo(
        "In RS " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() +
        (handler.isReplicationServer()?" From RS ":" From LS")+
        " with serverId=" + serverId + " receives " + msg);
        }
         */
        if (msg instanceof AckMsg)
        {
          AckMsg ack = (AckMsg) msg;
          handler.checkWindow();
          replicationServerDomain.ack(ack, serverId);
        } else if (msg instanceof UpdateMsg)
        {
          boolean filtered = false;
          /* Ignore updates in some cases */
          if (handler.isLDAPserver())
          /*
          if (debugEnabled())
          {
            /**
             * Ignore updates from DS in bad BAD_GENID_STATUS or
             * FULL_UPDATE_STATUS
             *
             * The RSD lock should not be taken here as it is acceptable to have
             * a delay between the time the server has a wrong status and the
             * fact we detect it: the updates that succeed to pass during this
             * time will have no impact on remote server. But it is interesting
             * to not saturate uselessly the network if the updates are not
             * necessary so this check to stop sending updates is interesting
             * anyway. Not taking the RSD lock allows to have better
             * performances in normal mode (most of the time).
             */
            ServerStatus dsStatus = handler.getStatus();
            if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
              (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
          TRACER.debugInfo(
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" From RS ":" From LS")+
          " with serverId=" + serverId + " receives " + msg);
          }
           */
          if (msg instanceof AckMsg)
          {
            AckMsg ack = (AckMsg) msg;
            handler.checkWindow();
            replicationServerDomain.ack(ack, serverId);
          } else if (msg instanceof UpdateMsg)
          {
            boolean filtered = false;
            /* Ignore updates in some cases */
            if (handler.isLDAPserver())
            {
              /**
               * Ignore updates from DS in bad BAD_GENID_STATUS or
               * FULL_UPDATE_STATUS
               *
               * The RSD lock should not be taken here as it is acceptable to
               * have a delay between the time the server has a wrong status and
               * the fact we detect it: the updates that succeed to pass during
               * this time will have no impact on remote server. But it is
               * interesting to not saturate uselessly the network if the
               * updates are not necessary so this check to stop sending updates
               * is interesting anyway. Not taking the RSD lock allows to have
               * better performances in normal mode (most of the time).
               */
              ServerStatus dsStatus = handler.getStatus();
              if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
                (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
              {
                long referenceGenerationId =
                  replicationServerDomain.getGenerationId();
                if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
                  logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
                    Short.toString(replicationServerDomain.
                    getReplicationServer().getServerId()),
                    replicationServerDomain.getBaseDn().toNormalizedString(),
                    ((UpdateMsg) msg).getChangeNumber().toString(),
                    Short.toString(handler.getServerId()),
                    Long.toString(referenceGenerationId),
                    Long.toString(handler.getGenerationId())));
                if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
                  logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                    Short.toString(replicationServerDomain.
                    getReplicationServer().getServerId()),
                    replicationServerDomain.getBaseDn().toNormalizedString(),
                    ((UpdateMsg) msg).getChangeNumber().toString(),
                    Short.toString(handler.getServerId())));
                filtered = true;
              }
            } else
            {
              /**
               * Ignore updates from RS with bad gen id
               * (no system managed status for a RS)
               */
              long referenceGenerationId =
                replicationServerDomain.getGenerationId();
              if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
                logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
              if ((referenceGenerationId > 0) &&
                (referenceGenerationId != handler.getGenerationId()))
              {
                logError(ERR_IGNORING_UPDATE_FROM_RS.get(
                  Short.toString(replicationServerDomain.getReplicationServer().
                  getServerId()),
                  replicationServerDomain.getBaseDn().toNormalizedString(),
@@ -174,103 +204,86 @@
                  Short.toString(handler.getServerId()),
                  Long.toString(referenceGenerationId),
                  Long.toString(handler.getGenerationId())));
              if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
                logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                  Short.toString(replicationServerDomain.getReplicationServer().
                  getServerId()),
                  replicationServerDomain.getBaseDn().toNormalizedString(),
                  ((UpdateMsg) msg).getChangeNumber().toString(),
                  Short.toString(handler.getServerId())));
              filtered = true;
                filtered = true;
              }
            }
          } else
          {
            /**
             * Ignore updates from RS with bad gen id
             * (no system managed status for a RS)
             */
            long referenceGenerationId =
              replicationServerDomain.getGenerationId();
            if ((referenceGenerationId > 0) &&
              (referenceGenerationId != handler.getGenerationId()))
            {
              logError(ERR_IGNORING_UPDATE_FROM_RS.get(
                Short.toString(replicationServerDomain.getReplicationServer().
                getServerId()),
                replicationServerDomain.getBaseDn().toNormalizedString(),
                ((UpdateMsg) msg).getChangeNumber().toString(),
                Short.toString(handler.getServerId()),
                Long.toString(referenceGenerationId),
                Long.toString(handler.getGenerationId())));
              filtered = true;
            }
          }
          if (!filtered)
            if (!filtered)
            {
              UpdateMsg update = (UpdateMsg) msg;
              handler.decAndCheckWindow();
              replicationServerDomain.put(update, handler);
            }
          } else if (msg instanceof WindowMsg)
          {
            UpdateMsg update = (UpdateMsg) msg;
            handler.decAndCheckWindow();
            replicationServerDomain.put(update, handler);
            WindowMsg windowMsg = (WindowMsg) msg;
            handler.updateWindow(windowMsg);
          } else if (msg instanceof InitializeRequestMsg)
          {
            InitializeRequestMsg initializeMsg =
              (InitializeRequestMsg) msg;
            handler.process(initializeMsg);
          } else if (msg instanceof InitializeTargetMsg)
          {
            InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
            handler.process(initializeMsg);
          } else if (msg instanceof EntryMsg)
          {
            EntryMsg entryMsg = (EntryMsg) msg;
            handler.process(entryMsg);
          } else if (msg instanceof DoneMsg)
          {
            DoneMsg doneMsg = (DoneMsg) msg;
            handler.process(doneMsg);
          } else if (msg instanceof ErrorMsg)
          {
            ErrorMsg errorMsg = (ErrorMsg) msg;
            handler.process(errorMsg);
          } else if (msg instanceof ResetGenerationIdMsg)
          {
            ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
            replicationServerDomain.resetGenerationId(handler, genIdMsg);
          } else if (msg instanceof WindowProbeMsg)
          {
            WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
            handler.process(windowProbeMsg);
          } else if (msg instanceof TopologyMsg)
          {
            TopologyMsg topoMsg = (TopologyMsg) msg;
            replicationServerDomain.receiveTopoInfoFromRS(topoMsg,
              handler, true);
          } else if (msg instanceof ChangeStatusMsg)
          {
            ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
            replicationServerDomain.processNewStatus(handler, csMsg);
          } else if (msg instanceof MonitorRequestMsg)
          {
            MonitorRequestMsg replServerMonitorRequestMsg =
              (MonitorRequestMsg) msg;
            handler.process(replServerMonitorRequestMsg);
          } else if (msg instanceof MonitorMsg)
          {
            MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
            handler.process(replServerMonitorMsg);
          } else if (msg == null)
          {
            /*
             * The remote server has sent an unknown message,
             * close the conenction.
             */
            Message message = NOTE_READER_NULL_MSG.get(handler.toString());
            logError(message);
            return;
          }
        } else if (msg instanceof WindowMsg)
        } catch (NotSupportedOldVersionPDUException e)
        {
          WindowMsg windowMsg = (WindowMsg) msg;
          handler.updateWindow(windowMsg);
        } else if (msg instanceof InitializeRequestMsg)
        {
          InitializeRequestMsg initializeMsg =
            (InitializeRequestMsg) msg;
          handler.process(initializeMsg);
        } else if (msg instanceof InitializeTargetMsg)
        {
          InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
          handler.process(initializeMsg);
        } else if (msg instanceof EntryMsg)
        {
          EntryMsg entryMsg = (EntryMsg) msg;
          handler.process(entryMsg);
        } else if (msg instanceof DoneMsg)
        {
          DoneMsg doneMsg = (DoneMsg) msg;
          handler.process(doneMsg);
        } else if (msg instanceof ErrorMsg)
        {
          ErrorMsg errorMsg = (ErrorMsg) msg;
          handler.process(errorMsg);
        } else if (msg instanceof ResetGenerationIdMsg)
        {
          ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
          replicationServerDomain.resetGenerationId(handler, genIdMsg);
        } else if (msg instanceof WindowProbeMsg)
        {
          WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
          handler.process(windowProbeMsg);
        } else if (msg instanceof TopologyMsg)
        {
          TopologyMsg topoMsg = (TopologyMsg) msg;
          replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true);
        } else if (msg instanceof ChangeStatusMsg)
        {
          ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
          replicationServerDomain.processNewStatus(handler, csMsg);
        } else if (msg instanceof MonitorRequestMsg)
        {
          MonitorRequestMsg replServerMonitorRequestMsg =
            (MonitorRequestMsg) msg;
          handler.process(replServerMonitorRequestMsg);
        } else if (msg instanceof MonitorMsg)
        {
          MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
          handler.process(replServerMonitorMsg);
        } else if (msg == null)
        {
          /*
           * The remote server has sent an unknown message,
           * close the conenction.
           */
          Message message = NOTE_READER_NULL_MSG.get(handler.toString());
          logError(message);
          return;
          // Received a V1 PDU we do not need to support:
          // we just trash the message and log the event for debug purpose,
          // then continue receiving messages.
          if (debugEnabled())
            TRACER.debugInfo("In " + replicationServerDomain.
              getReplicationServer().
              getMonitorInstanceName() + ":" + e.getMessage());
        }
      }
    } catch (IOException e)
@@ -304,13 +317,6 @@
       */
      Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
      logError(message);
    } catch (NotSupportedOldVersionPDUException e)
    {
      // Received a V1 PDU we do not need to support:
      // we just trash the message and log the event for debug purpose
      if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() + ":" + e.getMessage());
    } catch (Exception e)
    {
      if (debugEnabled())