mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
20.02.2013 a32bc83b64f0f82d731980a433a2180fe4f366f3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/Session.java
File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -40,21 +40,25 @@
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import javax.net.ssl.SSLSocket;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.StaticUtils;
/**
 * This class implements a protocol session using TLS.
 * This class defines a replication session using TLS.
 */
public final class TLSSocketSession implements ProtocolSession
public final class Session extends DirectoryThread
{
  /**
   * The tracer object for the debug logger.
@@ -86,7 +90,7 @@
   * connections.
   */
  private final Object stateLock = new Object();
  private boolean closeInitiated = false;
  private volatile boolean closeInitiated = false;
  private Throwable sessionError = null;
  /*
@@ -113,10 +117,13 @@
   */
  private BufferedOutputStream output;
  private final LinkedBlockingQueue<byte[]> sendQueue =
      new LinkedBlockingQueue<byte[]>(4000);
  private AtomicBoolean isRunning = new AtomicBoolean(false);
  private final CountDownLatch latch = new CountDownLatch(1);
  /**
   * Creates a new TLSSocketSession.
   * Creates a new Session.
   *
   * @param socket
   *          The regular Socket on which the SocketSession will be based.
@@ -125,13 +132,15 @@
   * @throws IOException
   *           When an IException happens on the socket.
   */
  public TLSSocketSession(final Socket socket,
      final SSLSocket secureSocket) throws IOException
  public Session(final Socket socket,
                 final SSLSocket secureSocket) throws IOException
  {
    super("Replication Session from "+ socket.getLocalSocketAddress() +
        " to " + socket.getRemoteSocketAddress());
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "Creating TLSSocketSession from %s to %s in %s",
          "Creating Session from %s to %s in %s",
          socket.getLocalSocketAddress(),
          socket.getRemoteSocketAddress(),
          stackTraceToSingleLineString(new Exception()));
@@ -153,9 +162,9 @@
  /**
   * {@inheritDoc}
   * This method is called when the session with the remote must be closed.
   * This object won't be used anymore after this method is called.
   */
  @Override
  public void close()
  {
    Throwable localSessionError;
@@ -171,13 +180,21 @@
      closeInitiated = true;
    }
    try {
      this.interrupt();
      this.join();
    }
    catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
    // Perform close outside of critical section.
    if (debugEnabled())
    {
      if (localSessionError == null)
      {
        TRACER.debugInfo(
            "Closing TLSSocketSession from %s to %s in %s",
            "Closing Session from %s to %s in %s",
            plainSocket.getLocalSocketAddress(),
            plainSocket.getRemoteSocketAddress(),
            stackTraceToSingleLineString(new Exception()));
@@ -185,7 +202,7 @@
      else
      {
        TRACER.debugInfo(
            "Aborting TLSSocketSession from %s to %s in %s due to the "
            "Aborting Session from %s to %s in %s due to the "
                + "following error: %s",
            plainSocket.getLocalSocketAddress(),
            plainSocket.getRemoteSocketAddress(),
@@ -199,20 +216,13 @@
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        if (publishLock.tryLock())
        try
        {
          try
          {
            publish(new StopMsg());
          }
          catch (final IOException ignored)
          {
            // Ignore errors on close.
          }
          finally
          {
            publishLock.unlock();
          }
          publish(new StopMsg());
        }
        catch (final IOException ignored)
        {
          // Ignore errors on close.
        }
      }
    }
@@ -223,9 +233,12 @@
  /**
   * {@inheritDoc}
   * This methods allows to determine if the session close was initiated
   * on this Session.
   *
   * @return A boolean allowing to determine if the session close was initiated
   * on this Session.
   */
  @Override
  public boolean closeInitiated()
  {
    synchronized (stateLock)
@@ -237,9 +250,10 @@
  /**
   * {@inheritDoc}
   * Gets the time the last replication message was published on this
   * session.
   * @return The timestamp in milliseconds of the last message published.
   */
  @Override
  public long getLastPublishTime()
  {
    return lastPublishTime;
@@ -248,9 +262,10 @@
  /**
   * {@inheritDoc}
   * Gets the time the last replication message was received on this
   * session.
   * @return The timestamp in milliseconds of the last message received.
   */
  @Override
  public long getLastReceiveTime()
  {
    if (lastReceiveTime == 0)
@@ -263,9 +278,10 @@
  /**
   * {@inheritDoc}
   * Retrieve the local URL in the form host:port.
   *
   * @return The local URL.
   */
  @Override
  public String getLocalUrl()
  {
    return localUrl;
@@ -274,9 +290,10 @@
  /**
   * {@inheritDoc}
   * Retrieve the human readable address of the remote server.
   *
   * @return The human readable address of the remote server.
   */
  @Override
  public String getReadableRemoteAddress()
  {
    return readableRemoteAddress;
@@ -285,9 +302,10 @@
  /**
   * {@inheritDoc}
   * Retrieve the IP address of the remote server.
   *
   * @return The IP address of the remote server.
   */
  @Override
  public String getRemoteAddress()
  {
    return remoteAddress;
@@ -296,9 +314,9 @@
  /**
   * {@inheritDoc}
   * Determine whether the session is using a security layer.
   * @return true if the connection is encrypted, false otherwise.
   */
  @Override
  public boolean isEncrypted()
  {
    return isEncrypted;
@@ -307,12 +325,38 @@
  /**
   * {@inheritDoc}
   * Sends a replication message to the remote peer.
   *
   * @param msg
   *          The message to be sent.
   * @throws IOException
   *           If an IO error occurred.
   */
  @Override
  public void publish(final ReplicationMsg msg) throws IOException
  {
    final byte[] buffer = msg.getBytes(protocolVersion);
    if (isRunning.get())
    {
      try {
        sendQueue.put(buffer);
      }
      catch (final InterruptedException e) {
        setSessionError(e);
        throw new IOException(e.getMessage());
      }
    } else {
      send(buffer);
    }
  }
  /** Sends a replication message already encoded to the socket.
   *
   * @param buffer
   *          the encoded buffer
   * @throws IOException if the message could not be sent
   */
  private void send(final byte[] buffer) throws IOException
  {
    final String str = String.format("%08x", buffer.length);
    final byte[] sendLengthBuf = str.getBytes();
@@ -326,9 +370,7 @@
      output.write(sendLengthBuf);
      output.write(buffer);
      output.flush();
    }
    catch (final IOException e)
    {
    } catch (final IOException e) {
      setSessionError(e);
      throw e;
    }
@@ -343,9 +385,20 @@
  /**
   * {@inheritDoc}
   * Attempt to receive a ReplicationMsg.
   * This method should block the calling thread until a
   * ReplicationMsg is available or until an error condition.
   *
   * This method can only be called by a single thread and therefore does not
   * need to implement any replication.
   *
   * @return The ReplicationMsg that was received.
   * @throws IOException When error happened during IO process.
   * @throws DataFormatException When the data received is not formatted as a
   *         ReplicationMsg.
   * @throws NotSupportedOldVersionPDUException If the received PDU is part of
   * an old protocol version and we do not support it.
   */
  @Override
  public ReplicationMsg receive() throws IOException,
      DataFormatException, NotSupportedOldVersionPDUException
  {
@@ -432,22 +485,23 @@
  }
  /**
   * {@inheritDoc}
   * This method is called at the establishment of the session and can
   * be used to record the version of the protocol that is currently used.
   *
   * @param version The version of the protocol that is currently used.
   */
  @Override
  public void setProtocolVersion(final short version)
  {
    protocolVersion = version;
  }
  /**
   * {@inheritDoc}
   * Returns the version of the protocol that is currently used.
   *
   * @return The version of the protocol that is currently used.
   */
  @Override
  public short getProtocolVersion()
  {
    return protocolVersion;
@@ -456,9 +510,16 @@
  /**
   * {@inheritDoc}
   * Set a timeout value.
   * With this option set to a non-zero value, calls to the receive() method
   * block for only this amount of time after which a
   * java.net.SocketTimeoutException is raised.
   * The Broker is valid and usable even after such an Exception is raised.
   *
   * @param timeout the specified timeout, in milliseconds.
   * @throws SocketException if there is an error in the underlying protocol,
   *         such as a TCP error.
   */
  @Override
  public void setSoTimeout(final int timeout) throws SocketException
  {
    plainSocket.setSoTimeout(timeout);
@@ -467,10 +528,8 @@
  /**
   * {@inheritDoc}
   * Stop using the security layer, if there is any.
   */
  @SuppressWarnings("unused")
  @Override
  public void stopEncryption()
  {
    /*
@@ -500,4 +559,59 @@
      }
    }
  }
  /**
   * Run method for the Session.
   * Loops waiting for buffers from the queue and sends them when available.
   */
  public void run()
  {
    isRunning.set(true);
    latch.countDown();
    if (debugEnabled())
    {
      TRACER.debugInfo(this.getName() + " starting.");
    }
    boolean needClosing = false;
    while (!closeInitiated)
    {
      byte[] buffer;
      try
      {
        buffer = sendQueue.take();
      }
      catch (InterruptedException ie)
      {
        break;
      }
      try
      {
        send(buffer);
      }
      catch (IOException e)
      {
        setSessionError(e);
        needClosing = true;
      }
    }
    isRunning.set(false);
    if (needClosing)
    {
      close();
    }
    if (debugEnabled())
    {
      TRACER.debugInfo(this.getName() + " stopped.");
    }
  }
  /**
   * This method can be called to wait until the session thread is
   * properly started.
   * @throws InterruptedException when interrupted
   */
  public void waitForStartup() throws InterruptedException
  {
    latch.await();
  }
}