mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.22.2014 387f7110b6201154b3b2bf1f66ca87790001e66f
opends/src/server/org/opends/server/replication/protocol/Session.java
@@ -22,21 +22,16 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.*;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -48,7 +43,8 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.StaticUtils;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class defines a replication session using TLS.
@@ -86,7 +82,7 @@
   */
  private final Object stateLock = new Object();
  private volatile boolean closeInitiated = false;
  private Throwable sessionError = null;
  private Throwable sessionError;
  /**
   * Publish guarded by publishLock: use a full lock here so that we can
@@ -208,18 +204,16 @@
    }
    // V4 protocol introduces a StopMsg to properly end communications.
    if (localSessionError == null)
    if (localSessionError == null
        && protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      try
      {
        try
        {
          publish(new StopMsg());
        }
        catch (final IOException ignored)
        {
          // Ignore errors on close.
        }
        publish(new StopMsg());
      }
      catch (final IOException ignored)
      {
        // Ignore errors on close.
      }
    }
@@ -411,50 +405,20 @@
  {
    try
    {
      // Read the first 8 bytes containing the packet length.
      int length = 0;
      /*
       * Let's start the stop-watch before waiting on read for the heartbeat
       * check to be operational.
       */
      lastReceiveTime = System.currentTimeMillis();
      while (length < 8)
      {
        final int read = input.read(rcvLengthBuf, length, 8 - length);
        if (read == -1)
        {
          lastReceiveTime = 0;
          throw new IOException("no more data");
        }
        else
        {
          length += read;
        }
      }
      final int totalLength = Integer.parseInt(new String(
          rcvLengthBuf), 16);
      // Read the first 8 bytes containing the packet length.
      read(rcvLengthBuf);
      final int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
      try
      {
        length = 0;
        final byte[] buffer = new byte[totalLength];
        while (length < totalLength)
        {
          final int read = input.read(buffer, length, totalLength
              - length);
          if (read == -1)
          {
            lastReceiveTime = 0;
            throw new IOException("no more data");
          }
          else
          {
            length += read;
          }
        }
        read(buffer);
        /*
         * We do not want the heartbeat to close the session when we are
@@ -491,6 +455,21 @@
    }
  }
  private void read(byte[] buffer) throws IOException
  {
    final int totalLength = buffer.length;
    int length = 0;
    while (length < totalLength)
    {
      final int read = input.read(buffer, length, totalLength - length);
      if (read == -1)
      {
        lastReceiveTime = 0;
        throw new IOException("no more data");
      }
      length += read;
    }
  }
  /**
   * This method is called at the establishment of the session and can