mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.29.2014 c2bc444bdc7e4ac083bbdf72ce8eb77ff020f662
opendj3-server-dev/src/server/org/opends/server/replication/protocol/Session.java
@@ -26,16 +26,14 @@
 */
package org.opends.server.replication.protocol;
import static org.opends.server.util.StaticUtils.*;
import java.io.*;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -47,8 +45,6 @@
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.util.StaticUtils;
/**
 * This class defines a replication session using TLS.
 */
@@ -82,7 +78,7 @@
   */
  private final Object stateLock = new Object();
  private volatile boolean closeInitiated = false;
  private Throwable sessionError = null;
  private Throwable sessionError;
  /**
   * Publish guarded by publishLock: use a full lock here so that we can
@@ -204,18 +200,16 @@
    }
    // V4 protocol introduces a StopMsg to properly end communications.
    if (localSessionError == null)
    if (localSessionError == null
        && protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      try
      {
        try
        {
          publish(new StopMsg());
        }
        catch (final IOException ignored)
        {
          // Ignore errors on close.
        }
        publish(new StopMsg());
      }
      catch (final IOException ignored)
      {
        // Ignore errors on close.
      }
    }
@@ -407,50 +401,20 @@
  {
    try
    {
      // Read the first 8 bytes containing the packet length.
      int length = 0;
      /*
       * Let's start the stop-watch before waiting on read for the heartbeat
       * check to be operational.
       */
      lastReceiveTime = System.currentTimeMillis();
      while (length < 8)
      {
        final int read = input.read(rcvLengthBuf, length, 8 - length);
        if (read == -1)
        {
          lastReceiveTime = 0;
          throw new IOException("no more data");
        }
        else
        {
          length += read;
        }
      }
      final int totalLength = Integer.parseInt(new String(
          rcvLengthBuf), 16);
      // Read the first 8 bytes containing the packet length.
      read(rcvLengthBuf);
      final int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
      try
      {
        length = 0;
        final byte[] buffer = new byte[totalLength];
        while (length < totalLength)
        {
          final int read = input.read(buffer, length, totalLength
              - length);
          if (read == -1)
          {
            lastReceiveTime = 0;
            throw new IOException("no more data");
          }
          else
          {
            length += read;
          }
        }
        read(buffer);
        /*
         * We do not want the heartbeat to close the session when we are
@@ -487,6 +451,21 @@
    }
  }
  private void read(byte[] buffer) throws IOException
  {
    final int totalLength = buffer.length;
    int length = 0;
    while (length < totalLength)
    {
      final int read = input.read(buffer, length, totalLength - length);
      if (read == -1)
      {
        lastReceiveTime = 0;
        throw new IOException("no more data");
      }
      length += read;
    }
  }
  /**
   * This method is called at the establishment of the session and can