mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
16.47.2012 1e9516a3f84cb1a524c3b4446ef5f7260f6e51d1
Fix OPENDJ-524: CME in LDAPClientConnection when writing many large responses concurrently to the same connection
1 files modified
172 ■■■■ changed files
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java 172 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -52,6 +52,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -154,6 +156,9 @@
   */
  private class TimeoutWriteByteChannel implements ByteChannel
  {
    // Synchronize concurrent writes to the same connection.
    private final Lock writeLock = new ReentrantLock();
    public int read(ByteBuffer byteBuffer) throws IOException
    {
      return clientChannel.read(byteBuffer);
@@ -169,104 +174,113 @@
      clientChannel.close();
    }
    public int write(ByteBuffer byteBuffer) throws IOException
    {
      int bytesToWrite = byteBuffer.remaining();
      clientChannel.write(byteBuffer);
      if(!byteBuffer.hasRemaining())
      {
        return bytesToWrite;
      }
      long startTime = System.currentTimeMillis();
      long waitTime = getMaxBlockedWriteTimeLimit();
      if (waitTime <= 0)
      {
        // We won't support an infinite time limit, so fall back to using
        // five minutes, which is a very long timeout given that we're
        // blocking a worker thread.
        waitTime = 300000L;
      }
      long stopTime = startTime + waitTime;
      Selector selector = getWriteSelector();
      if (selector == null)
      {
        // The client connection does not provide a selector, so we'll
        // fall back to a more inefficient way that will work without a
        // selector.
        while (byteBuffer.hasRemaining()
               && (System.currentTimeMillis() < stopTime))
        {
          if (clientChannel.write(byteBuffer) < 0)
          {
            // The client connection has been closed.
            throw new ClosedChannelException();
          }
        }
        if (byteBuffer.hasRemaining())
        {
          // If we've gotten here, then the write timed out.
          throw new ClosedChannelException();
        }
        return bytesToWrite;
      }
      // Register with the selector for handling write operations.
      SelectionKey key =
          clientChannel.register(selector, SelectionKey.OP_WRITE);
      writeLock.lock();
      try
      {
        selector.select(waitTime);
        while (byteBuffer.hasRemaining())
        int bytesToWrite = byteBuffer.remaining();
        clientChannel.write(byteBuffer);
        if (!byteBuffer.hasRemaining())
        {
          long currentTime = System.currentTimeMillis();
          if (currentTime >= stopTime)
          {
            // We've been blocked for too long.
            throw new ClosedChannelException();
          }
          else
          {
            waitTime = stopTime - currentTime;
          }
          return bytesToWrite;
        }
          Iterator<SelectionKey> iterator =
              selector.selectedKeys().iterator();
          while (iterator.hasNext())
        long startTime = System.currentTimeMillis();
        long waitTime = getMaxBlockedWriteTimeLimit();
        if (waitTime <= 0)
        {
          // We won't support an infinite time limit, so fall back to using
          // five minutes, which is a very long timeout given that we're
          // blocking a worker thread.
          waitTime = 300000L;
        }
        long stopTime = startTime + waitTime;
        Selector selector = getWriteSelector();
        if (selector == null)
        {
          // The client connection does not provide a selector, so we'll
          // fall back to a more inefficient way that will work without a
          // selector.
          while (byteBuffer.hasRemaining()
              && (System.currentTimeMillis() < stopTime))
          {
            SelectionKey k = iterator.next();
            if (k.isWritable())
            if (clientChannel.write(byteBuffer) < 0)
            {
              int bytesWritten = clientChannel.write(byteBuffer);
              if (bytesWritten < 0)
              {
                // The client connection has been closed.
                throw new ClosedChannelException();
              }
              iterator.remove();
              // The client connection has been closed.
              throw new ClosedChannelException();
            }
          }
          if (byteBuffer.hasRemaining())
          {
            selector.select(waitTime);
            // If we've gotten here, then the write timed out.
            throw new ClosedChannelException();
          }
          return bytesToWrite;
        }
        return bytesToWrite;
        // Register with the selector for handling write operations.
        SelectionKey key = clientChannel.register(selector,
            SelectionKey.OP_WRITE);
        try
        {
          selector.select(waitTime);
          while (byteBuffer.hasRemaining())
          {
            long currentTime = System.currentTimeMillis();
            if (currentTime >= stopTime)
            {
              // We've been blocked for too long.
              throw new ClosedChannelException();
            }
            else
            {
              waitTime = stopTime - currentTime;
            }
            Iterator<SelectionKey> iterator = selector.selectedKeys()
                .iterator();
            while (iterator.hasNext())
            {
              SelectionKey k = iterator.next();
              if (k.isWritable())
              {
                int bytesWritten = clientChannel.write(byteBuffer);
                if (bytesWritten < 0)
                {
                  // The client connection has been closed.
                  throw new ClosedChannelException();
                }
                iterator.remove();
              }
            }
            if (byteBuffer.hasRemaining())
            {
              selector.select(waitTime);
            }
          }
          return bytesToWrite;
        }
        finally
        {
          if (key.isValid())
          {
            key.cancel();
            selector.selectNow();
          }
        }
      }
      finally
      {
        if (key.isValid())
        {
          key.cancel();
          selector.selectNow();
        }
        writeLock.unlock();
      }
    }
  }