| | |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.net.Socket; |
| | | import java.net.SocketException; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | |
| | | |
| | | /** |
| | | * This class Implement a protocol session using a basic socket and relying on |
| | | * the innate encoding/decoding capabilities of the ReplicationMsg |
| | | * by using the getBytes() and generateMsg() methods of those classes. |
| | | * the innate encoding/decoding capabilities of the ReplicationMsg by using the |
| | | * getBytes() and generateMsg() methods of those classes. |
| | | */ |
| | | public class SocketSession implements ProtocolSession |
| | | public final class SocketSession implements ProtocolSession |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private Socket socket; |
| | | private InputStream input; |
| | | private OutputStream output; |
| | | byte[] rcvLengthBuf = new byte[8]; |
| | | private final Socket socket; |
| | | private final InputStream input; |
| | | private final OutputStream output; |
| | | private final byte[] rcvLengthBuf = new byte[8]; |
| | | |
| | | /** |
| | | * The time the last message published to this session. |
| | | */ |
| | | private volatile long lastPublishTime = 0; |
| | | |
| | | |
| | | /** |
| | | * The time the last message was received on this session. |
| | | */ |
| | | private long lastReceiveTime = 0; |
| | | private volatile long lastReceiveTime = 0; |
| | | |
| | | // Close guarded by closeLock: use a different lock to publish since |
| | | // publishing can block, and we don't want to block while closing failed |
| | | // connections. |
| | | private final Object closeLock = new Object(); |
| | | private boolean closeInitiated = false; |
| | | |
| | | // Publish guarded by publishLock: use a full lock here so that we can |
| | | // optionally publish StopMsg during close. |
| | | private final Lock publishLock = new ReentrantLock(); |
| | | |
| | | // Does not need protecting: updated only during single threaded handshake. |
| | | private short protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | |
| | | |
| | | |
| | | /** |
| | | * Creates a new SocketSession based on the provided socket. |
| | | * |
| | | * @param socket The Socket on which the SocketSession will be based. |
| | | * @throws IOException When an IException happens on the socket. |
| | | * @param socket |
| | | * The Socket on which the SocketSession will be based. |
| | | * @throws IOException |
| | | * When an IException happens on the socket. |
| | | */ |
| | | public SocketSession(Socket socket) throws IOException |
| | | public SocketSession(final Socket socket) throws IOException |
| | | { |
| | | this.socket = socket; |
| | | /* |
| | | * Use a window instead of the TCP flow control. |
| | | * Therefore set a very large value for send and receive buffer sizes. |
| | | */ |
| | | input = socket.getInputStream(); |
| | | output = socket.getOutputStream(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void close() throws IOException |
| | | { |
| | | closeInitiated = true; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Closing SocketSession." |
| | | + stackTraceToSingleLineString(new Exception())); |
| | | TRACER.debugInfo("Creating SocketSession to %s from %s", socket |
| | | .getRemoteSocketAddress().toString(), |
| | | stackTraceToSingleLineString(new Exception())); |
| | | } |
| | | socket.close(); |
| | | |
| | | this.socket = socket; |
| | | this.input = socket.getInputStream(); |
| | | this.output = socket.getOutputStream(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public synchronized void publish(ReplicationMsg msg) |
| | | throws IOException |
| | | @Override |
| | | public void close() |
| | | { |
| | | synchronized (closeLock) |
| | | { |
| | | if (closeInitiated) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | closeInitiated = true; |
| | | } |
| | | |
| | | // Perform close outside of critical section. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Closing SocketSession to %s from %s", socket |
| | | .getRemoteSocketAddress().toString(), |
| | | stackTraceToSingleLineString(new Exception())); |
| | | } |
| | | |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // V4 protocol introduces a StopMsg to properly end communications. |
| | | if (publishLock.tryLock()) |
| | | { |
| | | try |
| | | { |
| | | publish(new StopMsg()); |
| | | } |
| | | catch (final IOException ignored) |
| | | { |
| | | // Ignore errors on close. |
| | | } |
| | | finally |
| | | { |
| | | publishLock.unlock(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (socket != null && !socket.isClosed()) |
| | | { |
| | | try |
| | | { |
| | | socket.close(); |
| | | } |
| | | catch (final IOException ignored) |
| | | { |
| | | // Ignore errors on close. |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public boolean closeInitiated() |
| | | { |
| | | synchronized (closeLock) |
| | | { |
| | | return closeInitiated; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public long getLastPublishTime() |
| | | { |
| | | return lastPublishTime; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public long getLastReceiveTime() |
| | | { |
| | | if (lastReceiveTime == 0) |
| | | { |
| | | return System.currentTimeMillis(); |
| | | } |
| | | return lastReceiveTime; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String getReadableRemoteAddress() |
| | | { |
| | | return socket.getRemoteSocketAddress().toString(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String getRemoteAddress() |
| | | { |
| | | return socket.getInetAddress().getHostAddress(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public boolean isEncrypted() |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void publish(final ReplicationMsg msg) throws IOException |
| | | { |
| | | publish(msg, ProtocolVersion.getCurrentVersion()); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion) |
| | | throws IOException |
| | | @Override |
| | | public void publish(final ReplicationMsg msg, |
| | | final short reqProtocolVersion) throws IOException |
| | | { |
| | | byte[] buffer = msg.getBytes(reqProtocolVersion); |
| | | String str = String.format("%08x", buffer.length); |
| | | final byte[] buffer = msg.getBytes(reqProtocolVersion); |
| | | final String str = String.format("%08x", buffer.length); |
| | | final byte[] sendLengthBuf = str.getBytes(); |
| | | |
| | | if (debugEnabled()) |
| | | publishLock.lock(); |
| | | try |
| | | { |
| | | TRACER.debugInfo("SocketSession publish <" + str + ">"); |
| | | output.write(sendLengthBuf); |
| | | output.write(buffer); |
| | | output.flush(); |
| | | } |
| | | |
| | | byte[] sendLengthBuf = str.getBytes(); |
| | | |
| | | output.write(sendLengthBuf); |
| | | output.write(buffer); |
| | | output.flush(); |
| | | finally |
| | | { |
| | | publishLock.unlock(); |
| | | } |
| | | |
| | | lastPublishTime = System.currentTimeMillis(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public ReplicationMsg receive() throws IOException, |
| | | ClassNotFoundException, DataFormatException, |
| | | NotSupportedOldVersionPDUException |
| | | { |
| | | /* Read the first 8 bytes containing the packet length */ |
| | | // Read the first 8 bytes containing the packet length |
| | | int length = 0; |
| | | |
| | | /* Let's start the stop-watch before waiting on read */ |
| | | /* for the heartbeat check to be operationnal */ |
| | | // Let's start the stop-watch before waiting on read for the heartbeat check |
| | | // to be operational |
| | | lastReceiveTime = System.currentTimeMillis(); |
| | | |
| | | while (length<8) |
| | | while (length < 8) |
| | | { |
| | | int read = input.read(rcvLengthBuf, length, 8-length); |
| | | final int read = input.read(rcvLengthBuf, length, 8 - length); |
| | | if (read == -1) |
| | | { |
| | | lastReceiveTime=0; |
| | | lastReceiveTime = 0; |
| | | throw new IOException("no more data"); |
| | | } |
| | | else |
| | |
| | | } |
| | | } |
| | | |
| | | int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16); |
| | | final int totalLength = Integer.parseInt( |
| | | new String(rcvLengthBuf), 16); |
| | | |
| | | try |
| | | { |
| | | length = 0; |
| | | byte[] buffer = new byte[totalLength]; |
| | | final byte[] buffer = new byte[totalLength]; |
| | | while (length < totalLength) |
| | | { |
| | | length += input.read(buffer, length, totalLength - length); |
| | | } |
| | | /* We do not want the heartbeat to close the session when */ |
| | | /* we are processing a message even a time consuming one. */ |
| | | lastReceiveTime=0; |
| | | // We do not want the heartbeat to close the session when we are |
| | | // processing a message even a time consuming one. |
| | | lastReceiveTime = 0; |
| | | return ReplicationMsg.generateMsg(buffer, protocolVersion); |
| | | } |
| | | catch (OutOfMemoryError e) |
| | | catch (final OutOfMemoryError e) |
| | | { |
| | | throw new IOException("Packet too large, can't allocate " |
| | | + totalLength + " bytes."); |
| | | + totalLength + " bytes."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void stopEncryption() |
| | | { |
| | | // There is no security layer. |
| | | } |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public boolean isEncrypted() |
| | | @Override |
| | | public void setProtocolVersion(final short version) |
| | | { |
| | | return false; |
| | | protocolVersion = version; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public long getLastPublishTime() |
| | | { |
| | | return lastPublishTime; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public long getLastReceiveTime() |
| | | { |
| | | if (lastReceiveTime==0) |
| | | { |
| | | return System.currentTimeMillis(); |
| | | } |
| | | return lastReceiveTime; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public String getRemoteAddress() |
| | | { |
| | | return socket.getInetAddress().getHostAddress(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public String getReadableRemoteAddress() |
| | | { |
| | | return socket.getRemoteSocketAddress().toString(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void setSoTimeout(int timeout) throws SocketException |
| | | @Override |
| | | public void setSoTimeout(final int timeout) throws SocketException |
| | | { |
| | | socket.setSoTimeout(timeout); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public boolean closeInitiated() |
| | | { |
| | | return closeInitiated; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void setProtocolVersion(short version) |
| | | @Override |
| | | public void stopEncryption() |
| | | { |
| | | protocolVersion = version; |
| | | // There is no security layer. |
| | | } |
| | | } |