| | |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | |
| | | |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.*; |
| | | import java.net.Socket; |
| | | import java.net.SocketException; |
| | | import java.util.concurrent.CountDownLatch; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | |
| | | |
| | | /** |
| | | * This class defines a replication session using TLS. |
| | | */ |
| | |
| | | */ |
| | | private final Object stateLock = new Object(); |
| | | private volatile boolean closeInitiated = false; |
| | | private Throwable sessionError = null; |
| | | private Throwable sessionError; |
| | | |
| | | /** |
| | | * Publish guarded by publishLock: use a full lock here so that we can |
| | |
| | | } |
| | | |
| | | // V4 protocol introduces a StopMsg to properly end communications. |
| | | if (localSessionError == null) |
| | | if (localSessionError == null |
| | | && protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | try |
| | | { |
| | | try |
| | | { |
| | | publish(new StopMsg()); |
| | | } |
| | | catch (final IOException ignored) |
| | | { |
| | | // Ignore errors on close. |
| | | } |
| | | publish(new StopMsg()); |
| | | } |
| | | catch (final IOException ignored) |
| | | { |
| | | // Ignore errors on close. |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | try |
| | | { |
| | | // Read the first 8 bytes containing the packet length. |
| | | int length = 0; |
| | | |
| | | /* |
| | | * Let's start the stop-watch before waiting on read for the heartbeat |
| | | * check to be operational. |
| | | */ |
| | | lastReceiveTime = System.currentTimeMillis(); |
| | | |
| | | while (length < 8) |
| | | { |
| | | final int read = input.read(rcvLengthBuf, length, 8 - length); |
| | | if (read == -1) |
| | | { |
| | | lastReceiveTime = 0; |
| | | throw new IOException("no more data"); |
| | | } |
| | | else |
| | | { |
| | | length += read; |
| | | } |
| | | } |
| | | |
| | | final int totalLength = Integer.parseInt(new String( |
| | | rcvLengthBuf), 16); |
| | | // Read the first 8 bytes containing the packet length. |
| | | read(rcvLengthBuf); |
| | | final int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16); |
| | | |
| | | try |
| | | { |
| | | length = 0; |
| | | final byte[] buffer = new byte[totalLength]; |
| | | while (length < totalLength) |
| | | { |
| | | final int read = input.read(buffer, length, totalLength |
| | | - length); |
| | | if (read == -1) |
| | | { |
| | | lastReceiveTime = 0; |
| | | throw new IOException("no more data"); |
| | | } |
| | | else |
| | | { |
| | | length += read; |
| | | } |
| | | } |
| | | read(buffer); |
| | | |
| | | /* |
| | | * We do not want the heartbeat to close the session when we are |
| | |
| | | } |
| | | } |
| | | |
| | | private void read(byte[] buffer) throws IOException |
| | | { |
| | | final int totalLength = buffer.length; |
| | | int length = 0; |
| | | while (length < totalLength) |
| | | { |
| | | final int read = input.read(buffer, length, totalLength - length); |
| | | if (read == -1) |
| | | { |
| | | lastReceiveTime = 0; |
| | | throw new IOException("no more data"); |
| | | } |
| | | length += read; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This method is called at the establishment of the session and can |