mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
23.27.2011 6ee1440f6f56ac066f97383315b2798287f0821a
opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -27,135 +27,283 @@
 */
package org.opends.server.replication.protocol;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
/**
 * This class Implement a protocol session using a basic socket and relying on
 * the innate encoding/decoding capabilities of the ReplicationMsg
 * by using the getBytes() and generateMsg() methods of those classes.
 * the innate encoding/decoding capabilities of the ReplicationMsg by using the
 * getBytes() and generateMsg() methods of those classes.
 */
public class SocketSession implements ProtocolSession
public final class SocketSession implements ProtocolSession
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private Socket socket;
  private InputStream input;
  private OutputStream output;
  byte[] rcvLengthBuf = new byte[8];
  private final Socket socket;
  private final InputStream input;
  private final OutputStream output;
  private final byte[] rcvLengthBuf = new byte[8];
  /**
   * The time the last message published to this session.
   */
  private volatile long lastPublishTime = 0;
  /**
   * The time the last message was received on this session.
   */
  private long lastReceiveTime = 0;
  private volatile long lastReceiveTime = 0;
  // Close guarded by closeLock: use a different lock to publish since
  // publishing can block, and we don't want to block while closing failed
  // connections.
  private final Object closeLock = new Object();
  private boolean closeInitiated = false;
  // Publish guarded by publishLock: use a full lock here so that we can
  // optionally publish StopMsg during close.
  private final Lock publishLock = new ReentrantLock();
  // Does not need protecting: updated only during single threaded handshake.
  private short protocolVersion = ProtocolVersion.getCurrentVersion();
  /**
   * Creates a new SocketSession based on the provided socket.
   *
   * @param socket The Socket on which the SocketSession will be based.
   * @throws IOException When an IException happens on the socket.
   * @param socket
   *          The Socket on which the SocketSession will be based.
   * @throws IOException
   *           When an IException happens on the socket.
   */
  public SocketSession(Socket socket) throws IOException
  public SocketSession(final Socket socket) throws IOException
  {
    this.socket = socket;
    /*
     * Use a window instead of the TCP flow control.
     * Therefore set a very large value for send and receive buffer sizes.
     */
    input = socket.getInputStream();
    output = socket.getOutputStream();
  }
  /**
   * {@inheritDoc}
   */
  public void close() throws IOException
  {
    closeInitiated = true;
    if (debugEnabled())
    {
      TRACER.debugInfo("Closing SocketSession."
          + stackTraceToSingleLineString(new Exception()));
      TRACER.debugInfo("Creating SocketSession to %s from %s", socket
          .getRemoteSocketAddress().toString(),
          stackTraceToSingleLineString(new Exception()));
    }
    socket.close();
    this.socket = socket;
    this.input = socket.getInputStream();
    this.output = socket.getOutputStream();
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void publish(ReplicationMsg msg)
         throws IOException
  @Override
  public void close()
  {
    synchronized (closeLock)
    {
      if (closeInitiated)
      {
        return;
      }
      closeInitiated = true;
    }
    // Perform close outside of critical section.
    if (debugEnabled())
    {
      TRACER.debugInfo("Closing SocketSession to %s from %s", socket
          .getRemoteSocketAddress().toString(),
          stackTraceToSingleLineString(new Exception()));
    }
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // V4 protocol introduces a StopMsg to properly end communications.
      if (publishLock.tryLock())
      {
        try
        {
          publish(new StopMsg());
        }
        catch (final IOException ignored)
        {
          // Ignore errors on close.
        }
        finally
        {
          publishLock.unlock();
        }
      }
    }
    if (socket != null && !socket.isClosed())
    {
      try
      {
        socket.close();
      }
      catch (final IOException ignored)
      {
        // Ignore errors on close.
      }
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean closeInitiated()
  {
    synchronized (closeLock)
    {
      return closeInitiated;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public long getLastPublishTime()
  {
    return lastPublishTime;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public long getLastReceiveTime()
  {
    if (lastReceiveTime == 0)
    {
      return System.currentTimeMillis();
    }
    return lastReceiveTime;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String getReadableRemoteAddress()
  {
    return socket.getRemoteSocketAddress().toString();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String getRemoteAddress()
  {
    return socket.getInetAddress().getHostAddress();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean isEncrypted()
  {
    return false;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void publish(final ReplicationMsg msg) throws IOException
  {
    publish(msg, ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion)
         throws IOException
  @Override
  public void publish(final ReplicationMsg msg,
      final short reqProtocolVersion) throws IOException
  {
    byte[] buffer = msg.getBytes(reqProtocolVersion);
    String str = String.format("%08x", buffer.length);
    final byte[] buffer = msg.getBytes(reqProtocolVersion);
    final String str = String.format("%08x", buffer.length);
    final byte[] sendLengthBuf = str.getBytes();
    if (debugEnabled())
    publishLock.lock();
    try
    {
      TRACER.debugInfo("SocketSession publish <" + str + ">");
      output.write(sendLengthBuf);
      output.write(buffer);
      output.flush();
    }
    byte[] sendLengthBuf = str.getBytes();
    output.write(sendLengthBuf);
    output.write(buffer);
    output.flush();
    finally
    {
      publishLock.unlock();
    }
    lastPublishTime = System.currentTimeMillis();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public ReplicationMsg receive() throws IOException,
      ClassNotFoundException, DataFormatException,
      NotSupportedOldVersionPDUException
  {
    /* Read the first 8 bytes containing the packet length */
    // Read the first 8 bytes containing the packet length
    int length = 0;
    /* Let's start the stop-watch before waiting on read */
    /* for the heartbeat check to be operationnal        */
    // Let's start the stop-watch before waiting on read for the heartbeat check
    // to be operational
    lastReceiveTime = System.currentTimeMillis();
    while (length<8)
    while (length < 8)
    {
      int read = input.read(rcvLengthBuf, length, 8-length);
      final int read = input.read(rcvLengthBuf, length, 8 - length);
      if (read == -1)
      {
        lastReceiveTime=0;
        lastReceiveTime = 0;
        throw new IOException("no more data");
      }
      else
@@ -164,101 +312,59 @@
      }
    }
    int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
    final int totalLength = Integer.parseInt(
        new String(rcvLengthBuf), 16);
    try
    {
      length = 0;
      byte[] buffer = new byte[totalLength];
      final byte[] buffer = new byte[totalLength];
      while (length < totalLength)
      {
        length += input.read(buffer, length, totalLength - length);
      }
      /* We do not want the heartbeat to close the session when */
      /* we are processing a message even a time consuming one. */
      lastReceiveTime=0;
      // We do not want the heartbeat to close the session when we are
      // processing a message even a time consuming one.
      lastReceiveTime = 0;
      return ReplicationMsg.generateMsg(buffer, protocolVersion);
    }
    catch (OutOfMemoryError e)
    catch (final OutOfMemoryError e)
    {
      throw new IOException("Packet too large, can't allocate "
                            + totalLength + " bytes.");
          + totalLength + " bytes.");
    }
  }
  /**
   * {@inheritDoc}
   */
  public void stopEncryption()
  {
    // There is no security layer.
  }
  /**
   * {@inheritDoc}
   */
  public boolean isEncrypted()
  @Override
  public void setProtocolVersion(final short version)
  {
    return false;
    protocolVersion = version;
  }
  /**
   * {@inheritDoc}
   */
  public long getLastPublishTime()
  {
    return lastPublishTime;
  }
  /**
   * {@inheritDoc}
   */
  public long getLastReceiveTime()
  {
    if (lastReceiveTime==0)
    {
      return System.currentTimeMillis();
    }
    return lastReceiveTime;
  }
  /**
   * {@inheritDoc}
   */
  public String getRemoteAddress()
  {
    return socket.getInetAddress().getHostAddress();
  }
  /**
   * {@inheritDoc}
   */
  public String getReadableRemoteAddress()
  {
    return socket.getRemoteSocketAddress().toString();
  }
  /**
   * {@inheritDoc}
   */
  public void setSoTimeout(int timeout) throws SocketException
  @Override
  public void setSoTimeout(final int timeout) throws SocketException
  {
    socket.setSoTimeout(timeout);
  }
  /**
   * {@inheritDoc}
   */
  public boolean closeInitiated()
  {
    return closeInitiated;
  }
  /**
   * {@inheritDoc}
   */
  public void setProtocolVersion(short version)
  @Override
  public void stopEncryption()
  {
    protocolVersion = version;
    // There is no security layer.
  }
}