mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
07.02.2013 42dd3533f278a8d66edae0fa08d4a8abfb5d6835
Partial fix for OPENDJ-951: Reduce size and frequency of replication MonitorMsg

* use single TCP packet where possible for replication messages
* reduce frequency of socket reads.
1 files modified
100 ■■■■ changed files
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java 100 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -33,6 +33,8 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -45,6 +47,7 @@
import javax.net.ssl.SSLSocket;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.StaticUtils;
@@ -77,21 +80,38 @@
   */
  private volatile long lastReceiveTime = 0;
  // Close and error guarded by stateLock: use a different lock to publish since
  // publishing can block, and we don't want to block while closing failed
  // connections.
  /*
   * Close and error guarded by stateLock: use a different lock to publish since
   * publishing can block, and we don't want to block while closing failed
   * connections.
   */
  private final Object stateLock = new Object();
  private boolean closeInitiated = false;
  private Throwable sessionError = null;
  // Publish guarded by publishLock: use a full lock here so that we can
  // optionally publish StopMsg during close.
  /*
   * Publish guarded by publishLock: use a full lock here so that we can
   * optionally publish StopMsg during close.
   */
  private final Lock publishLock = new ReentrantLock();
  // Does not need protecting: updated only during single threaded handshake.
  /*
   * These do not need synchronization because they are only modified during the
   * initial single threaded handshake.
   */
  private short protocolVersion = ProtocolVersion.getCurrentVersion();
  private InputStream input;
  private OutputStream output;
  private boolean isEncrypted = true; // Initially encrypted.
  /*
   * Use a buffered input stream to avoid too many system calls.
   */
  private BufferedInputStream input;
  /*
   * Use a buffered output stream in order to combine message length and content
   * into a single TCP packet if possible.
   */
  private BufferedOutputStream output;
@@ -121,8 +141,8 @@
    this.secureSocket = secureSocket;
    this.plainInput = plainSocket.getInputStream();
    this.plainOutput = plainSocket.getOutputStream();
    this.input = secureSocket.getInputStream();
    this.output = secureSocket.getOutputStream();
    this.input = new BufferedInputStream(secureSocket.getInputStream());
    this.output = new BufferedOutputStream(secureSocket.getOutputStream());
    this.readableRemoteAddress = plainSocket.getRemoteSocketAddress()
        .toString();
    this.remoteAddress = plainSocket.getInetAddress().getHostAddress();
@@ -197,23 +217,7 @@
      }
    }
    try
    {
      plainSocket.close();
    }
    catch (final IOException ignored)
    {
      // Ignore errors on close.
    }
    try
    {
      secureSocket.close();
    }
    catch (final IOException ignored)
    {
      // Ignore errors on close.
    }
    StaticUtils.close(plainSocket, secureSocket);
  }
@@ -297,7 +301,7 @@
  @Override
  public boolean isEncrypted()
  {
    return input != plainInput;
    return isEncrypted;
  }
@@ -327,6 +331,10 @@
    publishLock.lock();
    try
    {
      /*
       * The buffered output stream ensures that the message is usually sent as
       * a single TCP packet.
       */
      output.write(sendLengthBuf);
      output.write(buffer);
      output.flush();
@@ -358,9 +366,10 @@
      // Read the first 8 bytes containing the packet length.
      int length = 0;
      // Let's start the stop-watch before waiting on read for the heartbeat
      // check
      // to be operational.
      /*
       * Let's start the stop-watch before waiting on read for the heartbeat
       * check to be operational.
       */
      lastReceiveTime = System.currentTimeMillis();
      while (length < 8)
@@ -398,8 +407,11 @@
            length += read;
          }
        }
        // We do not want the heartbeat to close the session when we are
        // processing a message even a time consuming one.
        /*
         * We do not want the heartbeat to close the session when we are
         * processing a message even a time consuming one.
         */
        lastReceiveTime = 0;
        return ReplicationMsg.generateMsg(buffer, protocolVersion);
      }
@@ -462,23 +474,19 @@
  @Override
  public void stopEncryption()
  {
    // The secure socket has been configured not to auto close the underlying
    // plain socket. We should close it here and properly tear down the SSL
    // session, but this is not compatible with the existing protocol.
    /*
     * The secure socket has been configured not to auto close the underlying
     * plain socket. We should close it here and properly tear down the SSL
     * session, but this is not compatible with the existing protocol.
     */
    if (false)
    {
      try
      {
        secureSocket.close();
      }
      catch (IOException ignored)
      {
        // Ignore.
      }
      StaticUtils.close(secureSocket);
    }
    input = plainInput;
    output = plainOutput;
    input = new BufferedInputStream(plainInput);
    output = new BufferedOutputStream(plainOutput);
    isEncrypted = false;
  }