mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
08.02.2009 c02dd7f87e9ba574f06e5cc1eb36ebeb76b9f446
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,13 +50,14 @@
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
@@ -94,17 +95,21 @@
    if (providedMsg != null)
    {
      if (debugEnabled())
        TRACER.debugInfo("In "
            + ((handler!=null)?handler.toString():"Replication Server")
            + " closing session with err=" +
            providedMsg.toString());
        TRACER.debugInfo("In " +
          ((handler != null) ? handler.toString() : "Replication Server") +
          " closing session with err=" +
          providedMsg.toString());
      logError(providedMsg);
    }
    try
    {
      if (providedSession!=null)
      if (providedSession != null)
        // This method is only called when aborting a failing handshake and
        // not StopMsg should be sent in such situation. StopMsg are only
        // expected when full handshake has been performed, or at end of
        // handshake phase 1, when DS was just gathering available RS info
        providedSession.close();
    } catch (IOException ee)
    } catch (IOException e)
    {
      // ignore
    }
@@ -174,7 +179,10 @@
  private int rcvWindow;
  private int rcvWindowSizeHalf;
  private int maxRcvWindow;
  /**
   * The size of the receiving window.
   */
  protected int maxRcvWindow;
  /**
   * Semaphore that the writer uses to control the flow to the remote server.
   */
@@ -197,7 +205,7 @@
   */
  protected long localGenerationId = -1;
  /**
   * The generation id before procesing a new start handshake.
   * The generation id before processing a new start handshake.
   */
  protected long oldGenerationId = -1;
  /**
@@ -210,7 +218,7 @@
  protected boolean initSslEncryption;
  /**
   * The SSL encryption after the negociation with the peer.
   * The SSL encryption after the negotiation with the peer.
   */
  protected boolean sslEncryption;
  /**
@@ -275,17 +283,6 @@
    // be disturbed
    if (session!=null)
    {
      try
      {
        session.publish(
            new ErrorMsg(
                replicationServerDomain.getReplicationServer().getServerId(),
                serverId,
                reason));
      }
      catch(Exception e)
      {
      }
      closeSession(session, reason, this);
    }
@@ -991,7 +988,14 @@
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + this +
          " publishes message:\n" + msg);
    session.publish(msg);
    // Currently only MonitorMsg has to support a backward compatibility
    if (msg instanceof MonitorMsg)
    {
      session.publish(msg, protocolVersion);
    } else
    {
      session.publish(msg);
    }
  }
  /**
@@ -1017,35 +1021,6 @@
  }
  /**
   * Send the ReplServerStartMsg to the remote server (RS or DS).
   * @param requestedProtocolVersion The provided protocol version.
   * @return The ReplServerStartMsg sent.
   * @throws IOException When an exception occurs.
   */
  public ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
  throws IOException
  {
    this.localGenerationId = replicationServerDomain.getGenerationId();
    ReplServerStartMsg outReplServerStartMsg
    = new ReplServerStartMsg(
        replicationServerId,
        replicationServerURL,
        getServiceId(),
        maxRcvWindow,
        replicationServerDomain.getDbServerState(),
        protocolVersion,
        localGenerationId,
        sslEncryption,
        getLocalGroupId(),
        replicationServerDomain.
        getReplicationServer().getDegradedStatusThreshold());
    session.publish(outReplServerStartMsg, requestedProtocolVersion);
    return outReplServerStartMsg;
  }
  /**
   * Sends the provided TopologyMsg to the peer server.
   *
   * @param topoMsg The TopologyMsg message to be sent.
@@ -1058,7 +1033,7 @@
    // V1 Rs do not support the TopologyMsg
    if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      session.publish(topoMsg);
      session.publish(topoMsg, protocolVersion);
    }
  }
@@ -1110,6 +1085,18 @@
    if (session != null)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end
        // communications
        try
        {
          session.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      // Close session to end ServerReader or ServerWriter
      try
      {
@@ -1328,12 +1315,27 @@
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
          "\nAND REPLIED:\n" + outTopoMsg.toString());
    }
  }
  /**
   * Log stop message has been received.
   */
  protected void logStopReceived()
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
    }
  }
  /**
   * Log the messages involved in the Topology/StartSession handshake.
   * @param inStartECLSessionMsg The message received first.
   */