mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.30.2014 75fec93860ffcb666d6f1d6fce4472f63089e20c
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -53,7 +53,6 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.HostPort;
import org.opends.server.util.ServerConstants;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -289,7 +288,7 @@
    {
      shutdown = false;
      this.rcvWindow = getMaxRcvWindow();
      connect();
      connectAsDataServer();
    }
  }
@@ -702,19 +701,6 @@
    }
  }
  private void connect()
  {
    if (getBaseDN().toNormalizedString().equalsIgnoreCase(
        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
    {
      connectAsECL();
    }
    else
    {
      connectAsDataServer();
    }
  }
  /**
   * Contacts all replication servers to get information from them and being
   * able to choose the more suitable.
@@ -728,7 +714,7 @@
    for (String serverUrl : getReplicationServerUrls())
    {
      // Connect to server + get and store info about it
      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false);
      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false);
      final ReplicationServerInfo rsInfo = rs.rsInfo;
      if (rsInfo != null)
      {
@@ -740,35 +726,6 @@
  }
  /**
   * Special aspects of connecting as ECL (External Change Log) compared to
   * connecting as data server are :
   * <ul>
   * <li>1 single RS configured</li>
   * <li>so no choice of the preferred RS</li>
   * <li>?? Heartbeat</li>
   * <li>Start handshake is :
   *
   * <pre>
   *    Broker ---> StartECLMsg       ---> RS
   *          <---- ReplServerStartMsg ---
   *           ---> StartSessionECLMsg --> RS
   * </pre>
   *
   * </li>
   * </ul>
   */
  private void connectAsECL()
  {
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    final String bestServerURL = getReplicationServerUrls().iterator().next();
    final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true);
    if (rs.isConnected())
    {
      performECLPhaseTwoHandshake(bestServerURL, rs);
    }
  }
  /**
   * Connect to a ReplicationServer.
   *
   * Handshake sequences between a DS and a RS is divided into 2 logical
@@ -844,7 +801,7 @@
              + evals.getBestRS());
        final ConnectedRS electedRS = performPhaseOneHandshake(
            evals.getBestRS().getServerURL(), true, false);
            evals.getBestRS().getServerURL(), true);
        final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
        if (electedRsInfo != null)
        {
@@ -1116,12 +1073,9 @@
   *          Do we keep session opened or not after handshake. Use true if want
   *          to perform handshake phase 2 with the same session and keep the
   *          session to create as the current one.
   * @param isECL
   *          Indicates whether or not the an ECL handshake is to be performed.
   * @return The answer from the server . Null if could not get an answer.
   */
  private ConnectedRS performPhaseOneHandshake(String serverURL,
      boolean keepSession, boolean isECL)
  private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession)
  {
    Session newSession = null;
    Socket socket = null;
@@ -1135,8 +1089,7 @@
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(),
          timeoutMS);
      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS);
      newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
      boolean isSslEncryption = replSessionSecurity.isSslEncryption();
@@ -1144,19 +1097,9 @@
      final HostPort hp = new HostPort(
          socket.getLocalAddress().getHostName(), socket.getLocalPort());
      final String url = hp.toString();
      final StartMsg serverStartMsg;
      if (!isECL)
      {
        serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
            getGenerationID(), isSslEncryption, getGroupId());
      }
      else
      {
        serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
            getGenerationID(), isSslEncryption, getGroupId());
      }
      final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
          getMaxRcvWindow(), config.getHeartbeatInterval(), state,
          getGenerationID(), isSslEncryption, getGroupId());
      newSession.publish(serverStartMsg);
      // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
@@ -1247,45 +1190,6 @@
    return setConnectedRS(ConnectedRS.noConnectedRS());
  }
  /**
   * Performs the second phase handshake for External Change Log (send
   * StartSessionMsg and receive TopologyMsg messages exchange) and return the
   * reply message from the replication server.
   *
   * @param server Server we are connecting with.
   */
  private void performECLPhaseTwoHandshake(String server, ConnectedRS rs)
  {
    try
    {
      // Send our Start Session
      final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
      startECLSessionMsg.setOperationId("-1");
      rs.session.publish(startECLSessionMsg);
      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
      if (debugEnabled())
      {
        debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
      }
      // Alright set the timeout to the desired value
      rs.session.setSoTimeout(timeout);
      setConnectedRS(rs);
    }
    catch (Exception e)
    {
      logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
          getServerId(), server, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e)));
      rs.session.close();
      setConnectedRS(ConnectedRS.noConnectedRS());
    }
  }
  /**
   * Performs the second phase handshake (send StartSessionMsg and receive
   * TopologyMsg messages exchange) and return the reply message from the
@@ -2288,7 +2192,7 @@
        try
        {
          connect();
          connectAsDataServer();
          rs = connectedRS.get();
        }
        catch (Exception e)
@@ -3103,7 +3007,8 @@
        Map<Integer, ReplicationServerInfo> previousRsInfos)
    {
      this.rsServerId = rsServerId;
      this.replicaInfos = dsInfosToKeep;
      this.replicaInfos = dsInfosToKeep == null
          ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep;
      this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
          previousRsInfos, configuredReplicationServerUrls);
    }
@@ -3310,7 +3215,9 @@
    @Override
    public String toString()
    {
      return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos
      return getClass().getSimpleName()
          + " rsServerId=" + rsServerId
          + ", replicaInfos=" + replicaInfos.values()
          + ", rsInfos=" + rsInfos.values();
    }
  }