mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
14.45.2011 d2db6915a220002a55281ebeb94fc8c590a33853
Another fix for issue OpenDJ-95: Socket leak and constant disconnect/reconnect when a directory server can no longer reach its connected replication server
2 files modified
248 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 32 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 216 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -514,28 +514,38 @@
      TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
               " connects to " + remoteServerURL);
    Socket socket = new Socket();
    ProtocolSession session = null;
    try
    {
      InetSocketAddress ServerAddr = new InetSocketAddress(
                     InetAddress.getByName(hostname), Integer.parseInt(port));
      Socket socket = new Socket();
          InetAddress.getByName(hostname), Integer.parseInt(port));
      socket.setTcpNoDelay(true);
      socket.connect(ServerAddr, 500);
      session = replSessionSecurity.createClientSession(socket,
          ReplSessionSecurity.HANDSHAKE_TIMEOUT);
      ReplicationServerHandler handler = new ReplicationServerHandler(
          replSessionSecurity.createClientSession(
              socket,
              ReplSessionSecurity.HANDSHAKE_TIMEOUT),
              queueSize,
              this.serverURL,
              serverId,
              this,
              rcvWindow);
          session, queueSize, this.serverURL, serverId, this,
          rcvWindow);
      handler.connect(baseDn, sslEncryption);
    }
    catch (Exception e)
    {
      // ignore
      if (session != null)
      {
        session.close();
      }
      try
      {
        socket.close();
      }
      catch (IOException ignored)
      {
        // Ignore.
      }
    }
  }
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -67,24 +67,7 @@
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartECLMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
import org.opends.server.replication.server.ReplicationServer;
@@ -780,7 +763,7 @@
    {
      // Connect to server and get info about it
      ReplicationServerInfo replicationServerInfo =
        performPhaseOneHandshake(server, false);
        performPhaseOneHandshake(server, false, false);
      // Store server info in list
      if (replicationServerInfo != null)
@@ -808,11 +791,10 @@
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    String bestServer = this.servers.iterator().next();
    ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake(
      bestServer, true);
    if (inReplServerStartDSMsg != null)
    if (performPhaseOneHandshake(bestServer, true, true) != null)
    {
      performECLPhaseTwoHandshake(bestServer);
    }
  }
  /**
@@ -887,7 +869,7 @@
            " phase 2 : will perform PhaseOneH with the preferred RS="
              + replicationServerInfo);
        replicationServerInfo = performPhaseOneHandshake(
          replicationServerInfo.getServerURL(), true);
          replicationServerInfo.getServerURL(), true, false);
        if (replicationServerInfo != null)
        {
@@ -1170,10 +1152,12 @@
   *          Do we keep session opened or not after handshake. Use true if want
   *          to perform handshake phase 2 with the same session and keep the
   *          session to create as the current one.
   * @param isECL
   *          Indicates whether or not the an ECL handshake is to be performed.
   * @return The answer from the server . Null if could not get an answer.
   */
  private ReplicationServerInfo performPhaseOneHandshake(
      String server, boolean keepConnection)
      String server, boolean keepConnection, boolean isECL)
  {
    int separator = server.lastIndexOf(':');
    String port = server.substring(separator + 1);
@@ -1202,10 +1186,21 @@
          .isSslEncryption(server);
      // Send our ServerStartMsg.
      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId,
          baseDn, maxRcvWindow, heartbeatInterval, state,
          ProtocolVersion.getCurrentVersion(),
          this.getGenerationID(), isSslEncryption, groupId);
      StartMsg serverStartMsg;
      if (!isECL)
      {
        serverStartMsg = new ServerStartMsg(serverId, baseDn,
            maxRcvWindow, heartbeatInterval, state,
            ProtocolVersion.getCurrentVersion(),
            this.getGenerationID(), isSslEncryption, groupId);
      }
      else
      {
        serverStartMsg = new ServerStartECLMsg(baseDn, 0, 0, 0, 0,
            maxRcvWindow, heartbeatInterval, state,
            ProtocolVersion.getCurrentVersion(),
            this.getGenerationID(), isSslEncryption, groupId);
      }
      localSession.publish(serverStartMsg);
      // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
@@ -1236,9 +1231,13 @@
       * replication server will use the same one (or an older one if it is an
       * old replication server).
       */
      protocolVersion = ProtocolVersion.minWithCurrent(replServerInfo
          .getProtocolVersion());
      localSession.setProtocolVersion(protocolVersion);
      final short localProtocolVersion = ProtocolVersion
          .minWithCurrent(replServerInfo.getProtocolVersion());
      if (keepConnection)
      {
        protocolVersion = localProtocolVersion;
      }
      localSession.setProtocolVersion(localProtocolVersion);
      if (!isSslEncryption)
      {
@@ -1318,160 +1317,7 @@
    }
  }
  /**
   * Connect to the provided server performing the first phase handshake
   * (start messages exchange) and return the reply message from the replication
   * server.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
   *        Use true if want to perform handshake phase 2 with the same session
   *        and keep the session to create as the current one.
   * @return The ReplServerStartDSMsg the server replied. Null if could not
   *         get an answer.
   */
  private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
    boolean keepConnection)
  {
    // FIXME: this should be merged with performPhaseOneHandshake to avoid
    // code/bug duplication.
    ReplServerStartDSMsg replServerStartDSMsg = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
    String port = server.substring(separator + 1);
    String hostname = server.substring(0, separator);
    ProtocolSession localSession = null;
    boolean error = false;
    try
    {
      /*
       * Open a socket connection to the next candidate.
       */
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(serverAddr, 500);
      localSession = replSessionSecurity.createClientSession(socket,
        ReplSessionSecurity.HANDSHAKE_TIMEOUT);
      boolean isSslEncryption =
        replSessionSecurity.isSslEncryption(server);
      // Send our start msg.
      ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg(
        baseDn, 0, 0, 0, 0,
        maxRcvWindow, heartbeatInterval, state,
        ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
        isSslEncryption,
        groupId);
      localSession.publish(serverStartECLMsg);
      // Read the ReplServerStartMsg that should come back.
      replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive();
      if (debugEnabled())
      {
        debugInfo("In RB for " + baseDn +
          "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
          "\nAND RECEIVED:\n" + replServerStartDSMsg.toString());
      }
      // Sanity check
      String repDn = replServerStartDSMsg.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      {
        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
          this.baseDn);
        logError(message);
        error = true;
      }
      /*
       * We have sent our own protocol version to the replication server.
       * The replication server will use the same one (or an older one
       * if it is an old replication server).
       */
      if (keepConnection)
        protocolVersion = ProtocolVersion.minWithCurrent(
          replServerStartDSMsg.getVersion());
      localSession.setProtocolVersion(protocolVersion);
      if (!isSslEncryption)
      {
        localSession.stopEncryption();
      }
    } catch (ConnectException e)
    {
      /*
       * There was no server waiting on this host:port
       * Log a notice and try the next replicationServer in the list
       */
      if (!connectionError)
      {
        Message message = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
            server, baseDn);
        if (keepConnection) // Log error message only for final connection
        {
          // the error message is only logged once to avoid overflowing
          // the error log
          logError(message);
        } else if (debugEnabled())
        {
          debugInfo(message.toString());
        }
      }
      error = true;
    } catch (Exception e)
    {
      if ((e instanceof SocketTimeoutException) && debugEnabled())
      {
        debugInfo("Timeout trying to connect to RS " + server +
          " for dn: " + baseDn);
      }
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
          server, baseDn, stackTraceToSingleLineString(e));
      if (keepConnection) // Log error message only for final connection
      {
        logError(message);
      } else if (debugEnabled())
      {
        debugInfo(message.toString());
      }
      error = true;
    }
    // Close session if requested
    if (!keepConnection || error)
    {
      if (localSession != null)
      {
        if (debugEnabled()) {
          debugInfo("In RB, closing session after phase 1");
        }
        localSession.close();
        localSession = null;
      }
      if (error)
      {
        replServerStartDSMsg = null;
      } // Be sure to return null.
    }
    // If this connection as the one to use for sending and receiving updates,
    // store it.
    if (keepConnection)
    {
      session = localSession;
    }
    return replServerStartDSMsg;
  }
  /**
   * Performs the second phase handshake (send StartSessionMsg and receive