mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
10.43.2009 ccc4127f23f63214f4dc2f94d26a021a3ec2eec6
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -26,15 +26,9 @@
 */
package org.opends.server.replication.service;
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
@@ -52,13 +46,33 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartECLMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.util.ServerConstants;
import org.opends.server.replication.server.ReplicationServer;
/**
@@ -300,6 +314,43 @@
    }
  }
  private void connect()
  {
    if (this.baseDn.compareToIgnoreCase(
        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0)
    {
      connectAsECL();
    }
    else
    {
      connectAsDataServer();
    }
  }
  /**
   * Special aspects of connecting as ECL compared to connecting as data server
   * are :
   * - 1 single RS configured
   * - so no choice of the prefered RS
   * - No same groupID polling
   * - ?? Heartbeat
   * - Start handshake is :
   *    Broker ---> StartECLMsg       ---> RS
   *          <---- ReplServerStartMsg ---
   *           ---> StartSessionECLMsg --> RS
   */
  private void connectAsECL()
  {
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    String bestServer = this.servers.iterator().next();
    ReplServerStartMsg inReplServerStartMsg
      = performECLPhaseOneHandshake(bestServer, true);
    if (inReplServerStartMsg!=null)
      performECLPhaseTwoHandshake(bestServer);
  }
  /**
   * Connect to a ReplicationServer.
   *
@@ -325,7 +376,7 @@
   *
   * @throws NumberFormatException address was invalid
   */
  private void connect()
  private void connectAsDataServer()
  {
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
@@ -350,6 +401,9 @@
       * Connect to each replication server and get their ServerState then find
       * out which one is the best to connect to.
       */
      if (debugEnabled())
        TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
            " order to elect the prefered one");
      for (String server : servers)
      {
        // Connect to server and get reply message
@@ -375,6 +429,9 @@
          serverId, baseDn, groupId);
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          TRACER.debugInfo(
              "phase 2 : will perform PhaseOneH with the prefered RS.");
        replServerStartMsg = performPhaseOneHandshake(bestServer, true);
        if (replServerStartMsg != null) // Handshake phase 1 exchange went well
@@ -764,6 +821,8 @@
      {
        try
        {
          if (debugEnabled())
            TRACER.debugInfo("In RB, closing session after phase 1");
          localSession.close();
        } catch (IOException e)
        {
@@ -789,6 +848,225 @@
  }
  /**
   * Connect to the provided server performing the first phase handshake
   * (start messages exchange) and return the reply message from the replication
   * server.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
   *        Use true if want to perform handshake phase 2 with the same session
   *        and keep the session to create as the current one.
   * @return The ReplServerStartMsg the server replied. Null if could not
   *         get an answer.
   */
  private ReplServerStartMsg performECLPhaseOneHandshake(String server,
    boolean keepConnection)
  {
    ReplServerStartMsg replServerStartMsg = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
    String port = server.substring(separator + 1);
    String hostname = server.substring(0, separator);
    ProtocolSession localSession = null;
    boolean error = false;
    try
    {
      /*
       * Open a socket connection to the next candidate.
       */
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      if (keepConnection)
        tmpReadableServerName = serverAddr.toString();
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(serverAddr, 500);
      localSession = replSessionSecurity.createClientSession(server, socket,
        ReplSessionSecurity.HANDSHAKE_TIMEOUT);
      boolean isSslEncryption =
        replSessionSecurity.isSslEncryption(server);
      // Send our start msg.
      ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg(
          baseDn, 0, 0, 0, 0,
          maxRcvWindow, heartbeatInterval, state,
          ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
          isSslEncryption,
          groupId);
      localSession.publish(serverStartECLMsg);
      // Read the ReplServerStartMsg that should come back.
      replServerStartMsg = (ReplServerStartMsg) localSession.receive();
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn +
          "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
          "\nAND RECEIVED:\n" + replServerStartMsg.toString());
      }
      // Sanity check
      String repDn = replServerStartMsg.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      {
        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
          this.baseDn);
        logError(message);
        error = true;
      }
      /*
       * We have sent our own protocol version to the replication server.
       * The replication server will use the same one (or an older one
       * if it is an old replication server).
       */
      if (keepConnection)
        protocolVersion = ProtocolVersion.minWithCurrent(
          replServerStartMsg.getVersion());
      if (!isSslEncryption)
      {
        localSession.stopEncryption();
      }
    } catch (ConnectException e)
    {
      /*
       * There was no server waiting on this host:port
       * Log a notice and try the next replicationServer in the list
       */
      if (!connectionError)
      {
        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
        if (keepConnection) // Log error message only for final connection
        {
          // the error message is only logged once to avoid overflowing
          // the error log
          logError(message);
        } else if (debugEnabled())
        {
          TRACER.debugInfo(message.toString());
        }
      }
      error = true;
    } catch (Exception e)
    {
      if ( (e instanceof SocketTimeoutException) && debugEnabled() )
      {
        TRACER.debugInfo("Timeout trying to connect to RS " + server +
          " for dn: " + baseDn);
      }
      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
        baseDn, server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      if (keepConnection) // Log error message only for final connection
      {
        logError(message);
      } else if (debugEnabled())
      {
        TRACER.debugInfo(message.toString());
      }
      error = true;
    }
    // Close session if requested
    if (!keepConnection || error)
    {
      if (localSession != null)
      {
        try
        {
          if (debugEnabled())
            TRACER.debugInfo("In RB, closing session after phase 1");
          localSession.close();
        } catch (IOException e)
        {
          // The session was already closed, just ignore.
        }
        localSession = null;
      }
      if (error)
      {
        replServerStartMsg = null;
      } // Be sure to return null.
    }
    // If this connection as the one to use for sending and receiving updates,
    // store it.
    if (keepConnection)
    {
      session = localSession;
    }
    return replServerStartMsg;
  }
  /**
   * Performs the second phase handshake (send StartSessionMsg and receive
   * TopologyMsg messages exchange) and return the reply message from the
   * replication server.
   *
   * @param server Server we are connecting with.
   * @param initStatus The status we are starting with
   * @return The ReplServerStartMsg the server replied. Null if could not
   *         get an answer.
   */
  private TopologyMsg performECLPhaseTwoHandshake(String server)
  {
    TopologyMsg topologyMsg = null;
    try
    {
      // Send our Start Session
      StartECLSessionMsg startECLSessionMsg = null;
      startECLSessionMsg = new StartECLSessionMsg();
      startECLSessionMsg.setOperationId(Short.toString(serverId));
      session.publish(startECLSessionMsg);
      /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ?
       * Read the TopologyMsg that should come back.
      topologyMsg = (TopologyMsg) session.receive();
       */
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn +
          "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString());
      //  +   "\nAND RECEIVED:\n" + topologyMsg.toString());
      }
      // Alright set the timeout to the desired value
      session.setSoTimeout(timeout);
      connected = true;
    } catch (Exception e)
    {
      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
        baseDn, server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      logError(message);
      if (session != null)
      {
        try
        {
          session.close();
        } catch (IOException ex)
        {
          // The session was already closed, just ignore.
        }
        session = null;
      }
      // Be sure to return null.
      topologyMsg = null;
    }
    return topologyMsg;
  }
  /**
   * Performs the second phase handshake (send StartSessionMsg and receive
   * TopologyMsg messages exchange) and return the reply message from the
   * replication server.
@@ -1720,6 +1998,12 @@
    {
      boolean done = false;
      if (debugEnabled())
      {
        TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
          " started.");
      }
      while ((!done) && (!sameGroupIdPollershutdown))
      {
        // Sleep some time between checks
@@ -1850,11 +2134,6 @@
   */
  public void receiveTopo(TopologyMsg topoMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDn
        + " received topology info update:\n" + topoMsg);
    // Store new lists
    synchronized(getDsList())
    {