mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

jcduff
23.04.2008 b4f8838b15342670c31753a484abf0129e3c9653
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -25,6 +25,7 @@
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -36,23 +37,26 @@
import java.io.IOException;
import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.InitializeTargetMessage;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.replication.protocol.MonitorMessage;
import org.opends.server.replication.protocol.MonitorRequestMessage;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.
  NotSupportedOldVersionPDUException;
/**
 * This class implement the part of the replicationServer that is reading
@@ -66,11 +70,11 @@
 */
public class ServerReader extends DirectoryThread
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private short serverId;
  private ProtocolSession session;
  private ServerHandler handler;
@@ -86,10 +90,11 @@
   *        reader.
   */
  public ServerReader(ProtocolSession session, short serverId,
                      ServerHandler handler,
                      ReplicationServerDomain replicationServerDomain)
    ServerHandler handler,
    ReplicationServerDomain replicationServerDomain)
  {
    super(handler.toString() + " reader");
    super("Replication Reader for " + handler.toString() + " in RS " +
      replicationServerDomain.getReplicationServer().getServerId());
    this.session = session;
    this.serverId = serverId;
    this.handler = handler;
@@ -104,10 +109,10 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" RS ":" LS")+
          " reader starting for serverId=" + serverId);
        "In RS " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() +
        (handler.isReplicationServer() ? " RS " : " LS") +
        " reader starting for serverId=" + serverId);
    }
    /*
     * wait on input stream
@@ -118,103 +123,146 @@
    {
      while (true)
      {
        ReplicationMessage msg = session.receive();
        ReplicationMsg msg = session.receive();
        /*
        if (debugEnabled())
        {
          TRACER.debugInfo(
              "In RS " + replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
              (handler.isReplicationServer()?" From RS ":" From LS")+
              " with serverId=" + serverId + " receives " + msg);
        TRACER.debugInfo(
        "In RS " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() +
        (handler.isReplicationServer()?" From RS ":" From LS")+
        " with serverId=" + serverId + " receives " + msg);
        }
        */
        if (msg instanceof AckMessage)
         */
        if (msg instanceof AckMsg)
        {
          AckMessage ack = (AckMessage) msg;
          AckMsg ack = (AckMsg) msg;
          handler.checkWindow();
          replicationServerDomain.ack(ack, serverId);
        }
        else if (msg instanceof UpdateMessage)
        } else if (msg instanceof UpdateMsg)
        {
          // Ignore update received from a replica with
          // a bad generation ID
          long referenceGenerationId =
                  replicationServerDomain.getGenerationId();
          if ((referenceGenerationId>0) &&
          boolean filtered = false;
          /* Ignore updates in some cases */
          if (handler.isLDAPserver())
          {
            /**
             * Ignore updates from DS in bad BAD_GENID_STATUS or
             * FULL_UPDATE_STATUS
             *
             * The RSD lock should not be taken here as it is acceptable to have
             * a delay between the time the server has a wrong status and the
             * fact we detect it: the updates that succeed to pass during this
             * time will have no impact on remote server. But it is interesting
             * to not saturate uselessly the network if the updates are not
             * necessary so this check to stop sending updates is interesting
             * anyway. Not taking the RSD lock allows to have better
             * performances in normal mode (most of the time).
             */
            ServerStatus dsStatus = handler.getStatus();
            if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
              (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
            {
              long referenceGenerationId =
                replicationServerDomain.getGenerationId();
              if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
                logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
                  Short.toString(replicationServerDomain.getReplicationServer().
                  getServerId()),
                  replicationServerDomain.getBaseDn().toNormalizedString(),
                  ((UpdateMsg) msg).getChangeNumber().toString(),
                  Short.toString(handler.getServerId()),
                  Long.toString(referenceGenerationId),
                  Long.toString(handler.getGenerationId())));
              if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
                logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                  Short.toString(replicationServerDomain.getReplicationServer().
                  getServerId()),
                  replicationServerDomain.getBaseDn().toNormalizedString(),
                  ((UpdateMsg) msg).getChangeNumber().toString(),
                  Short.toString(handler.getServerId())));
              filtered = true;
            }
          } else
          {
            /**
             * Ignore updates from RS with bad gen id
             * (no system managed status for a RS)
             */
            long referenceGenerationId =
              replicationServerDomain.getGenerationId();
            if ((referenceGenerationId > 0) &&
              (referenceGenerationId != handler.getGenerationId()))
          {
            logError(ERR_IGNORING_UPDATE_FROM.get(
                msg.toString(),
                handler.getMonitorInstanceName()));
            {
              logError(ERR_IGNORING_UPDATE_FROM_RS.get(
                Short.toString(replicationServerDomain.getReplicationServer().
                getServerId()),
                replicationServerDomain.getBaseDn().toNormalizedString(),
                ((UpdateMsg) msg).getChangeNumber().toString(),
                Short.toString(handler.getServerId()),
                Long.toString(referenceGenerationId),
                Long.toString(handler.getGenerationId())));
              filtered = true;
            }
          }
          else
          if (!filtered)
          {
            UpdateMessage update = (UpdateMessage) msg;
            UpdateMsg update = (UpdateMsg) msg;
            handler.decAndCheckWindow();
            replicationServerDomain.put(update, handler);
          }
        }
        else if (msg instanceof WindowMessage)
        } else if (msg instanceof WindowMsg)
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          WindowMsg windowMsg = (WindowMsg) msg;
          handler.updateWindow(windowMsg);
        }
        else if (msg instanceof InitializeRequestMessage)
        } else if (msg instanceof InitializeRequestMsg)
        {
          InitializeRequestMessage initializeMsg =
            (InitializeRequestMessage) msg;
          InitializeRequestMsg initializeMsg =
            (InitializeRequestMsg) msg;
          handler.process(initializeMsg);
        }
        else if (msg instanceof InitializeTargetMessage)
        } else if (msg instanceof InitializeTargetMsg)
        {
          InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg;
          InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
          handler.process(initializeMsg);
        }
        else if (msg instanceof EntryMessage)
        } else if (msg instanceof EntryMsg)
        {
          EntryMessage entryMsg = (EntryMessage) msg;
          EntryMsg entryMsg = (EntryMsg) msg;
          handler.process(entryMsg);
        }
        else if (msg instanceof DoneMessage)
        } else if (msg instanceof DoneMsg)
        {
          DoneMessage doneMsg = (DoneMessage) msg;
          DoneMsg doneMsg = (DoneMsg) msg;
          handler.process(doneMsg);
        }
        else if (msg instanceof ErrorMessage)
        } else if (msg instanceof ErrorMsg)
        {
          ErrorMessage errorMsg = (ErrorMessage) msg;
          ErrorMsg errorMsg = (ErrorMsg) msg;
          handler.process(errorMsg);
        }
        else if (msg instanceof ResetGenerationId)
        } else if (msg instanceof ResetGenerationIdMsg)
        {
          ResetGenerationId genIdMsg = (ResetGenerationId) msg;
          replicationServerDomain.resetGenerationId(this.handler, genIdMsg);
        }
        else if (msg instanceof WindowProbe)
          ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
          replicationServerDomain.resetGenerationId(handler, genIdMsg);
        } else if (msg instanceof WindowProbeMsg)
        {
          WindowProbe windowProbeMsg = (WindowProbe) msg;
          WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
          handler.process(windowProbeMsg);
        }
        else if (msg instanceof ReplServerInfoMessage)
        } else if (msg instanceof TopologyMsg)
        {
          ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
          handler.receiveReplServerInfo(infoMsg);
          replicationServerDomain.receiveReplServerInfo(infoMsg, handler);
        }
        else if (msg instanceof MonitorRequestMessage)
          TopologyMsg topoMsg = (TopologyMsg) msg;
          replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true);
        } else if (msg instanceof ChangeStatusMsg)
        {
          MonitorRequestMessage replServerMonitorRequestMsg =
            (MonitorRequestMessage) msg;
          ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
          replicationServerDomain.processNewStatus(handler, csMsg);
        } else if (msg instanceof MonitorRequestMsg)
        {
          MonitorRequestMsg replServerMonitorRequestMsg =
            (MonitorRequestMsg) msg;
          handler.process(replServerMonitorRequestMsg);
        }
        else if (msg instanceof MonitorMessage)
        } else if (msg instanceof MonitorMsg)
        {
          MonitorMessage replServerMonitorMsg = (MonitorMessage) msg;
          MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
          handler.process(replServerMonitorMsg);
        }
        else if (msg == null)
        } else if (msg == null)
        {
          /*
           * The remote server has sent an unknown message,
@@ -236,9 +284,11 @@
        TRACER.debugInfo(
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " reader IO EXCEPTION for serverID=" + serverId
          + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
      Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
          " reader IO EXCEPTION for serverID=" + serverId +
          stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
      Message message = NOTE_SERVER_DISCONNECT.get(handler.toString(),
        Short.toString(replicationServerDomain.
        getReplicationServer().getServerId()));
      logError(message);
    } catch (ClassNotFoundException e)
    {
@@ -246,30 +296,36 @@
        TRACER.debugInfo(
          "In RS <" + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " reader CNF EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e));
          " reader CNF EXCEPTION serverID=" + serverId +
          stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the connection.
       */
      Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
      logError(message);
    } catch (NotSupportedOldVersionPDUException e)
    {
      // Received a V1 PDU we do not need to support:
      // we just trash the message and log the event for debug purpose
      if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() + ":" + e.getMessage());
    } catch (Exception e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " server reader EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e));
          " server reader EXCEPTION serverID=" + serverId +
          stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the connection.
       */
      Message message = NOTE_READER_EXCEPTION.get(handler.toString());
      logError(message);
    }
    finally
    } finally
    {
      /*
       * The thread only exit the loop above is some error condition
@@ -287,15 +343,15 @@
        session.close();
      } catch (IOException e)
      {
       // ignore
      // ignore
      }
      replicationServerDomain.stopServer(handler);
    }
    if (debugEnabled())
      TRACER.debugInfo(
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" RS":" LDAP") +
          " server reader stopped for serverID=" + serverId);
        "In RS " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() +
        (handler.isReplicationServer() ? " RS" : " LDAP") +
        " server reader stopped for serverID=" + serverId);
  }
}