mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.27.2013 c7077670daca3b689ed75e4bf71dad0483af8473
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -27,11 +27,6 @@
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
import org.opends.messages.Message;
@@ -40,6 +35,12 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class implement the part of the replicationServer that is reading
 * the connection from the LDAP servers to get all the updates that
@@ -74,7 +75,7 @@
  public ServerReader(Session session, ServerHandler handler)
  {
    super("Replication server RS(" + handler.getReplicationServerId()
        + ") reading from " + handler.toString() + " at "
        + ") reading from " + handler + " at "
        + session.getReadableRemoteAddress());
    this.session = session;
    this.handler = handler;
@@ -90,7 +91,7 @@
    Message errMessage = null;
    if (debugEnabled())
    {
      TRACER.debugInfo(this.getName() + " starting");
      TRACER.debugInfo(getName() + " starting");
    }
    /*
     * wait on input stream
@@ -110,13 +111,14 @@
          if (msg instanceof AckMsg)
          {
            AckMsg ack = (AckMsg) msg;
            handler.checkWindow();
            handler.processAck(ack);
            handler.processAck((AckMsg) msg);
          } else if (msg instanceof UpdateMsg)
          {
            UpdateMsg updateMsg = (UpdateMsg) msg;
            boolean filtered = false;
            /* Ignore updates in some cases */
            // Ignore updates in some cases
            if (handler.isDataServer())
            {
              /**
@@ -133,22 +135,22 @@
               * better performances in normal mode (most of the time).
               */
              ServerStatus dsStatus = handler.getStatus();
              if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
                (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
              if (dsStatus == BAD_GEN_ID_STATUS
                  || dsStatus == FULL_UPDATE_STATUS)
              {
                long referenceGenerationId = handler.getReferenceGenId();
                if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
                if (dsStatus == BAD_GEN_ID_STATUS)
                  logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get(
                      handler.getReplicationServerId(),
                      ((UpdateMsg) msg).getChangeNumber().toString(),
                      updateMsg.getChangeNumber().toString(),
                      handler.getBaseDN(), handler.getServerId(),
                      session.getReadableRemoteAddress(),
                      handler.getGenerationId(),
                      referenceGenerationId));
                if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
                if (dsStatus == FULL_UPDATE_STATUS)
                  logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                      handler.getReplicationServerId(),
                      ((UpdateMsg) msg).getChangeNumber().toString(),
                      updateMsg.getChangeNumber().toString(),
                      handler.getBaseDN(), handler.getServerId(),
                      session.getReadableRemoteAddress()));
                filtered = true;
@@ -159,14 +161,14 @@
               * Ignore updates from RS with bad gen id
               * (no system managed status for a RS)
               */
              long referenceGenerationId =handler.getReferenceGenId();
              if ((referenceGenerationId > 0) &&
                (referenceGenerationId != handler.getGenerationId()))
              long referenceGenerationId = handler.getReferenceGenId();
              if (referenceGenerationId > 0
                  && referenceGenerationId != handler.getGenerationId())
              {
                logError(
                    WARN_IGNORING_UPDATE_FROM_RS.get(
                        handler.getReplicationServerId(),
                        ((UpdateMsg) msg).getChangeNumber().toString(),
                        updateMsg.getChangeNumber().toString(),
                        handler.getBaseDN(),
                        handler.getServerId(),
                        session.getReadableRemoteAddress(),
@@ -178,53 +180,24 @@
            if (!filtered)
            {
              UpdateMsg update = (UpdateMsg) msg;
              handler.decAndCheckWindow();
              handler.put(update);
              handler.put(updateMsg);
            }
          } else if (msg instanceof WindowMsg)
          {
            WindowMsg windowMsg = (WindowMsg) msg;
            handler.updateWindow(windowMsg);
          } else if (msg instanceof InitializeRequestMsg)
            handler.updateWindow((WindowMsg) msg);
          } else if (msg instanceof RoutableMsg)
          {
            InitializeRequestMsg initializeMsg =
              (InitializeRequestMsg) msg;
            handler.process(initializeMsg);
          } else if (msg instanceof InitializeRcvAckMsg)
          {
            InitializeRcvAckMsg initializeRcvAckMsg =
              (InitializeRcvAckMsg) msg;
            handler.process(initializeRcvAckMsg);
          } else if (msg instanceof InitializeTargetMsg)
          {
            InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
            handler.process(initializeMsg);
          } else if (msg instanceof EntryMsg)
          {
            EntryMsg entryMsg = (EntryMsg) msg;
            handler.process(entryMsg);
          } else if (msg instanceof DoneMsg)
          {
            DoneMsg doneMsg = (DoneMsg) msg;
            handler.process(doneMsg);
          } else if (msg instanceof ErrorMsg)
          {
            ErrorMsg errorMsg = (ErrorMsg) msg;
            handler.process(errorMsg);
            handler.process((RoutableMsg) msg);
          } else if (msg instanceof ResetGenerationIdMsg)
          {
            ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
            handler.processResetGenId(genIdMsg);
            handler.processResetGenId((ResetGenerationIdMsg) msg);
          } else if (msg instanceof WindowProbeMsg)
          {
            WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
            handler.process(windowProbeMsg);
            handler.replyToWindowProbe();
          } else if (msg instanceof TopologyMsg)
          {
            TopologyMsg topoMsg = (TopologyMsg) msg;
            ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
            rsh.receiveTopoInfoFromRS(topoMsg);
            rsh.receiveTopoInfoFromRS((TopologyMsg) msg);
          } else if (msg instanceof ChangeStatusMsg)
          {
            ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
@@ -242,28 +215,18 @@
                    csMsg.toString());
              logError(errMessage);
            }
          } else if (msg instanceof MonitorRequestMsg)
          {
            MonitorRequestMsg replServerMonitorRequestMsg =
              (MonitorRequestMsg) msg;
            handler.process(replServerMonitorRequestMsg);
          } else if (msg instanceof MonitorMsg)
          {
            MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
            handler.process(replServerMonitorMsg);
          } else if (msg instanceof ChangeTimeHeartbeatMsg)
          {
            ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
            handler.process(cthbMsg);
            handler.process((ChangeTimeHeartbeatMsg) msg);
          } else if (msg instanceof StopMsg)
          {
            // Peer server is properly disconnecting: go out of here to
            // properly close the server handler going to finally block.
            if (debugEnabled())
            {
              TRACER.debugInfo(handler.toString() + " has properly " +
                "disconnected from this replication server " +
                Integer.toString(handler.getReplicationServerId()));
              TRACER.debugInfo(handler
                  + " has properly disconnected from this replication server "
                  + handler.getReplicationServerId());
            }
            return;
          } else if (msg == null)
@@ -281,9 +244,7 @@
          // Received a V1 PDU we do not need to support:
          // we just trash the message and log the event for debug purpose,
          // then continue receiving messages.
          if (debugEnabled())
            TRACER.debugInfo(
                "In " + this.getName() + " " + stackTraceToSingleLineString(e));
          logException(e);
        }
      }
    }
@@ -294,9 +255,7 @@
       * Log a message and exit from this loop
       * So that this handler is stopped.
       */
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      logException(e);
      if (!handler.shuttingDown())
      {
        if (handler.isDataServer())
@@ -316,9 +275,7 @@
    }
    catch (Exception e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      logException(e);
      /*
       * The remote server has sent an unknown message,
       * close the connection.
@@ -334,14 +291,21 @@
       */
      if (debugEnabled())
      {
        TRACER.debugInfo("In " + this.getName() + " closing the session");
        TRACER.debugInfo("In " + getName() + " closing the session");
      }
      session.close();
      handler.doStop();
      if (debugEnabled())
      {
        TRACER.debugInfo(this.getName() + " stopped " + errMessage);
        TRACER.debugInfo(getName() + " stopped: " + errMessage);
      }
    }
  }
  private void logException(Exception e)
  {
    if (debugEnabled())
      TRACER.debugInfo(
          "In " + getName() + " " + stackTraceToSingleLineString(e));
  }
}