mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
19.56.2009 9f6142d16669ab52980d41520298c78522ee8eef
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -60,7 +60,6 @@
  private int serverId;
  private ProtocolSession session;
  private ServerHandler handler;
  private ReplicationServerDomain replicationServerDomain;
  /**
   * Constructor for the LDAP server reader part of the replicationServer.
@@ -68,20 +67,15 @@
   * @param session The ProtocolSession from which to read the data.
   * @param serverId The server ID of the server from which we read messages.
   * @param handler The server handler for this server reader.
   * @param replicationServerDomain The ReplicationServerDomain for this server
   *        reader.
   */
  public ServerReader(ProtocolSession session, int serverId,
    ServerHandler handler,
    ReplicationServerDomain replicationServerDomain)
      ServerHandler handler)
  {
    super("Replication Reader Thread for handler of " +
        handler.toString() +
        " in " + replicationServerDomain);
    super("Replication Reader Thread for RS handler " +
        handler.getMonitorInstanceName());
    this.session = session;
    this.serverId = serverId;
    this.handler = handler;
    this.replicationServerDomain = replicationServerDomain;
  }
  /**
@@ -109,15 +103,14 @@
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationServerDomain + " " +
                getName() + " receives " + msg);
            TRACER.debugInfo("In " + getName() + " receives " + msg);
          }
          if (msg instanceof AckMsg)
          {
            AckMsg ack = (AckMsg) msg;
            handler.checkWindow();
            replicationServerDomain.processAck(ack, handler);
            handler.processAck(ack);
          } else if (msg instanceof UpdateMsg)
          {
            boolean filtered = false;
@@ -141,22 +134,19 @@
              if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
                (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
              {
                long referenceGenerationId =
                  replicationServerDomain.getGenerationId();
                long referenceGenerationId = handler.getReferenceGenId();
                if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
                  logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
                    Integer.toString(replicationServerDomain.
                    getReplicationServer().getServerId()),
                    replicationServerDomain.getBaseDn(),
                    Integer.toString(handler.getReplicationServerId()),
                    handler.getServiceId(),
                    ((UpdateMsg) msg).getChangeNumber().toString(),
                    Integer.toString(handler.getServerId()),
                    Long.toString(referenceGenerationId),
                    Long.toString(handler.getGenerationId())));
                if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
                  logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                    Integer.toString(replicationServerDomain.
                    getReplicationServer().getServerId()),
                    replicationServerDomain.getBaseDn(),
                    Integer.toString(handler.getReplicationServerId()),
                    handler.getServiceId(),
                    ((UpdateMsg) msg).getChangeNumber().toString(),
                    Integer.toString(handler.getServerId())));
                filtered = true;
@@ -167,17 +157,15 @@
               * Ignore updates from RS with bad gen id
               * (no system managed status for a RS)
               */
              long referenceGenerationId =
                replicationServerDomain.getGenerationId();
              long referenceGenerationId =handler.getReferenceGenId();
              if ((referenceGenerationId > 0) &&
                (referenceGenerationId != handler.getGenerationId()))
              {
                logError(
                    ERR_IGNORING_UPDATE_FROM_RS.get(
                        Integer.toString(
                            replicationServerDomain.getReplicationServer().
                            getServerId()),
                        replicationServerDomain.getBaseDn(),
                            handler.getReplicationServerId()),
                        handler.getServiceId(),
                        ((UpdateMsg) msg).getChangeNumber().toString(),
                        Integer.toString(handler.getServerId()),
                        Long.toString(referenceGenerationId),
@@ -190,7 +178,7 @@
            {
              UpdateMsg update = (UpdateMsg) msg;
              handler.decAndCheckWindow();
              replicationServerDomain.put(update, handler);
              handler.put(update);
            }
          } else if (msg instanceof WindowMsg)
          {
@@ -220,7 +208,7 @@
          } else if (msg instanceof ResetGenerationIdMsg)
          {
            ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
            replicationServerDomain.resetGenerationId(handler, genIdMsg);
            handler.processResetGenId(genIdMsg);
          } else if (msg instanceof WindowProbeMsg)
          {
            WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
@@ -231,8 +219,7 @@
            try
            {
              ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
              replicationServerDomain.receiveTopoInfoFromRS(topoMsg,
                  rsh, true);
              rsh.receiveTopoInfoFromRS(topoMsg);
            }
            catch(Exception e)
            {
@@ -247,13 +234,13 @@
            try
            {
              DataServerHandler dsh = (DataServerHandler)handler;
              replicationServerDomain.processNewStatus(dsh, csMsg);
              dsh.receiveNewStatus(csMsg);
            }
            catch(Exception e)
            {
              errMessage =
                ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(
                    replicationServerDomain.getBaseDn(),
                    handler.getServiceId(),
                    Integer.toString(handler.getServerId()),
                    csMsg.toString());
              logError(errMessage);
@@ -270,8 +257,7 @@
          } else if (msg instanceof ChangeTimeHeartbeatMsg)
          {
            ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
            replicationServerDomain.processChangeTimeHeartbeatMsg(handler,
                cthbMsg);
            handler.process(cthbMsg);
          } else if (msg instanceof StopMsg)
          {
            // Peer server is properly disconnecting: go out of here to
@@ -280,8 +266,7 @@
            {
              TRACER.debugInfo(handler.toString() + " has properly " +
                "disconnected from this replication server " +
                Integer.toString(replicationServerDomain.getReplicationServer().
                getServerId()));
                Integer.toString(handler.getReplicationServerId()));
            }
            return;
          } else if (msg == null)
@@ -300,9 +285,8 @@
          // we just trash the message and log the event for debug purpose,
          // then continue receiving messages.
          if (debugEnabled())
            TRACER.debugInfo("In " + replicationServerDomain.
              getReplicationServer().
              getMonitorInstanceName() + ":" + e.getMessage());
            TRACER.debugInfo(
                "In " + this.getName() + " " + stackTraceToSingleLineString(e));
        }
      }
    }
@@ -315,24 +299,16 @@
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " reader IO EXCEPTION for serverID=" + serverId + " " +
          this + " " +
          stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
        Integer.toString(replicationServerDomain.
        getReplicationServer().getServerId()));
        Integer.toString(handler.getReplicationServerId()));
      logError(errMessage);
    }
    catch (ClassNotFoundException e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " reader CNF EXCEPTION serverID=" + serverId +
          stackTraceToSingleLineString(e));
            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the connection.
@@ -344,10 +320,7 @@
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " server reader EXCEPTION serverID=" + serverId +
          " " + stackTraceToSingleLineString(e));
          "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the connection.
@@ -364,11 +337,6 @@
       */
      try
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "In RS " + replicationServerDomain.getReplicationServer().
            getMonitorInstanceName() +
            this + " is closing the session");
        if (handler.getProtocolVersion() >=
          ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
@@ -382,12 +350,14 @@
            // Anyway, going to close session, so nothing to do
          }
        }
        if (debugEnabled())
          TRACER.debugInfo("In " + this.getName() + " closing the session");
        session.close();
      } catch (IOException e)
      {
      // ignore
      }
      replicationServerDomain.stopServer(handler);
      handler.doStop();
      if (debugEnabled())
      {
        TRACER.debugInfo(this.getName() + " stopped " + errMessage);