| File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java |
| | |
| | | import java.io.OutputStream; |
| | | import java.net.Socket; |
| | | import java.net.SocketException; |
| | | import java.util.concurrent.CountDownLatch; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import javax.net.ssl.SSLSocket; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | |
| | | |
| | | /** |
| | | * This class implements a protocol session using TLS. |
| | | * This class defines a replication session using TLS. |
| | | */ |
| | | public final class TLSSocketSession implements ProtocolSession |
| | | public final class Session extends DirectoryThread |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | |
| | | * connections. |
| | | */ |
| | | private final Object stateLock = new Object(); |
| | | private boolean closeInitiated = false; |
| | | private volatile boolean closeInitiated = false; |
| | | private Throwable sessionError = null; |
| | | |
| | | /* |
| | |
| | | */ |
| | | private BufferedOutputStream output; |
| | | |
| | | |
| | | private final LinkedBlockingQueue<byte[]> sendQueue = |
| | | new LinkedBlockingQueue<byte[]>(4000); |
| | | private AtomicBoolean isRunning = new AtomicBoolean(false); |
| | | private final CountDownLatch latch = new CountDownLatch(1); |
| | | |
| | | /** |
| | | * Creates a new TLSSocketSession. |
| | | * Creates a new Session. |
| | | * |
| | | * @param socket |
| | | * The regular Socket on which the SocketSession will be based. |
| | |
| | | * @throws IOException |
| | | * When an IException happens on the socket. |
| | | */ |
| | | public TLSSocketSession(final Socket socket, |
| | | final SSLSocket secureSocket) throws IOException |
| | | public Session(final Socket socket, |
| | | final SSLSocket secureSocket) throws IOException |
| | | { |
| | | super("Replication Session from "+ socket.getLocalSocketAddress() + |
| | | " to " + socket.getRemoteSocketAddress()); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "Creating TLSSocketSession from %s to %s in %s", |
| | | "Creating Session from %s to %s in %s", |
| | | socket.getLocalSocketAddress(), |
| | | socket.getRemoteSocketAddress(), |
| | | stackTraceToSingleLineString(new Exception())); |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * This method is called when the session with the remote must be closed. |
| | | * This object won't be used anymore after this method is called. |
| | | */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | Throwable localSessionError; |
| | |
| | | closeInitiated = true; |
| | | } |
| | | |
| | | try { |
| | | this.interrupt(); |
| | | this.join(); |
| | | } |
| | | catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | |
| | | // Perform close outside of critical section. |
| | | if (debugEnabled()) |
| | | { |
| | | if (localSessionError == null) |
| | | { |
| | | TRACER.debugInfo( |
| | | "Closing TLSSocketSession from %s to %s in %s", |
| | | "Closing Session from %s to %s in %s", |
| | | plainSocket.getLocalSocketAddress(), |
| | | plainSocket.getRemoteSocketAddress(), |
| | | stackTraceToSingleLineString(new Exception())); |
| | |
| | | else |
| | | { |
| | | TRACER.debugInfo( |
| | | "Aborting TLSSocketSession from %s to %s in %s due to the " |
| | | "Aborting Session from %s to %s in %s due to the " |
| | | + "following error: %s", |
| | | plainSocket.getLocalSocketAddress(), |
| | | plainSocket.getRemoteSocketAddress(), |
| | |
| | | { |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | if (publishLock.tryLock()) |
| | | try |
| | | { |
| | | try |
| | | { |
| | | publish(new StopMsg()); |
| | | } |
| | | catch (final IOException ignored) |
| | | { |
| | | // Ignore errors on close. |
| | | } |
| | | finally |
| | | { |
| | | publishLock.unlock(); |
| | | } |
| | | publish(new StopMsg()); |
| | | } |
| | | catch (final IOException ignored) |
| | | { |
| | | // Ignore errors on close. |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * This methods allows to determine if the session close was initiated |
| | | * on this Session. |
| | | * |
| | | * @return A boolean allowing to determine if the session close was initiated |
| | | * on this Session. |
| | | */ |
| | | @Override |
| | | public boolean closeInitiated() |
| | | { |
| | | synchronized (stateLock) |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Gets the time the last replication message was published on this |
| | | * session. |
| | | * @return The timestamp in milliseconds of the last message published. |
| | | */ |
| | | @Override |
| | | public long getLastPublishTime() |
| | | { |
| | | return lastPublishTime; |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Gets the time the last replication message was received on this |
| | | * session. |
| | | * @return The timestamp in milliseconds of the last message received. |
| | | */ |
| | | @Override |
| | | public long getLastReceiveTime() |
| | | { |
| | | if (lastReceiveTime == 0) |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Retrieve the local URL in the form host:port. |
| | | * |
| | | * @return The local URL. |
| | | */ |
| | | @Override |
| | | public String getLocalUrl() |
| | | { |
| | | return localUrl; |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Retrieve the human readable address of the remote server. |
| | | * |
| | | * @return The human readable address of the remote server. |
| | | */ |
| | | @Override |
| | | public String getReadableRemoteAddress() |
| | | { |
| | | return readableRemoteAddress; |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Retrieve the IP address of the remote server. |
| | | * |
| | | * @return The IP address of the remote server. |
| | | */ |
| | | @Override |
| | | public String getRemoteAddress() |
| | | { |
| | | return remoteAddress; |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Determine whether the session is using a security layer. |
| | | * @return true if the connection is encrypted, false otherwise. |
| | | */ |
| | | @Override |
| | | public boolean isEncrypted() |
| | | { |
| | | return isEncrypted; |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Sends a replication message to the remote peer. |
| | | * |
| | | * @param msg |
| | | * The message to be sent. |
| | | * @throws IOException |
| | | * If an IO error occurred. |
| | | */ |
| | | @Override |
| | | public void publish(final ReplicationMsg msg) throws IOException |
| | | { |
| | | final byte[] buffer = msg.getBytes(protocolVersion); |
| | | if (isRunning.get()) |
| | | { |
| | | try { |
| | | sendQueue.put(buffer); |
| | | } |
| | | catch (final InterruptedException e) { |
| | | setSessionError(e); |
| | | throw new IOException(e.getMessage()); |
| | | } |
| | | } else { |
| | | send(buffer); |
| | | } |
| | | } |
| | | |
| | | /** Sends a replication message already encoded to the socket. |
| | | * |
| | | * @param buffer |
| | | * the encoded buffer |
| | | * @throws IOException if the message could not be sent |
| | | */ |
| | | private void send(final byte[] buffer) throws IOException |
| | | { |
| | | final String str = String.format("%08x", buffer.length); |
| | | final byte[] sendLengthBuf = str.getBytes(); |
| | | |
| | |
| | | output.write(sendLengthBuf); |
| | | output.write(buffer); |
| | | output.flush(); |
| | | } |
| | | catch (final IOException e) |
| | | { |
| | | } catch (final IOException e) { |
| | | setSessionError(e); |
| | | throw e; |
| | | } |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Attempt to receive a ReplicationMsg. |
| | | * This method should block the calling thread until a |
| | | * ReplicationMsg is available or until an error condition. |
| | | * |
| | | * This method can only be called by a single thread and therefore does not |
| | | * need to implement any replication. |
| | | * |
| | | * @return The ReplicationMsg that was received. |
| | | * @throws IOException When error happened during IO process. |
| | | * @throws DataFormatException When the data received is not formatted as a |
| | | * ReplicationMsg. |
| | | * @throws NotSupportedOldVersionPDUException If the received PDU is part of |
| | | * an old protocol version and we do not support it. |
| | | */ |
| | | @Override |
| | | public ReplicationMsg receive() throws IOException, |
| | | DataFormatException, NotSupportedOldVersionPDUException |
| | | { |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * This method is called at the establishment of the session and can |
| | | * be used to record the version of the protocol that is currently used. |
| | | * |
| | | * @param version The version of the protocol that is currently used. |
| | | */ |
| | | @Override |
| | | public void setProtocolVersion(final short version) |
| | | { |
| | | protocolVersion = version; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Returns the version of the protocol that is currently used. |
| | | * |
| | | * @return The version of the protocol that is currently used. |
| | | */ |
| | | @Override |
| | | public short getProtocolVersion() |
| | | { |
| | | return protocolVersion; |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Set a timeout value. |
| | | * With this option set to a non-zero value, calls to the receive() method |
| | | * block for only this amount of time after which a |
| | | * java.net.SocketTimeoutException is raised. |
| | | * The Broker is valid and usable even after such an Exception is raised. |
| | | * |
| | | * @param timeout the specified timeout, in milliseconds. |
| | | * @throws SocketException if there is an error in the underlying protocol, |
| | | * such as a TCP error. |
| | | */ |
| | | @Override |
| | | public void setSoTimeout(final int timeout) throws SocketException |
| | | { |
| | | plainSocket.setSoTimeout(timeout); |
| | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Stop using the security layer, if there is any. |
| | | */ |
| | | @SuppressWarnings("unused") |
| | | @Override |
| | | public void stopEncryption() |
| | | { |
| | | /* |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Run method for the Session. |
| | | * Loops waiting for buffers from the queue and sends them when available. |
| | | */ |
| | | public void run() |
| | | { |
| | | isRunning.set(true); |
| | | latch.countDown(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(this.getName() + " starting."); |
| | | } |
| | | boolean needClosing = false; |
| | | while (!closeInitiated) |
| | | { |
| | | byte[] buffer; |
| | | try |
| | | { |
| | | buffer = sendQueue.take(); |
| | | } |
| | | catch (InterruptedException ie) |
| | | { |
| | | break; |
| | | } |
| | | try |
| | | { |
| | | send(buffer); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | setSessionError(e); |
| | | needClosing = true; |
| | | } |
| | | } |
| | | isRunning.set(false); |
| | | if (needClosing) |
| | | { |
| | | close(); |
| | | } |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(this.getName() + " stopped."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This method can be called to wait until the session thread is |
| | | * properly started. |
| | | * @throws InterruptedException when interrupted |
| | | */ |
| | | public void waitForStartup() throws InterruptedException |
| | | { |
| | | latch.await(); |
| | | } |
| | | } |