From 1e9516a3f84cb1a524c3b4446ef5f7260f6e51d1 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 15 Jun 2012 22:47:49 +0000
Subject: [PATCH] Fix OPENDJ-524: CME in LDAPClientConnection when writing many large responses concurrently to the same connection

---
 opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java |  172 +++++++++++++++++++++++++++++++--------------------------
 1 files changed, 93 insertions(+), 79 deletions(-)

diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
index 8a460d4..6c9dc22 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -52,6 +52,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
@@ -154,6 +156,9 @@
    */
   private class TimeoutWriteByteChannel implements ByteChannel
   {
+    // Synchronize concurrent writes to the same connection.
+    private final Lock writeLock = new ReentrantLock();
+
     public int read(ByteBuffer byteBuffer) throws IOException
     {
       return clientChannel.read(byteBuffer);
@@ -169,104 +174,113 @@
       clientChannel.close();
     }
 
+
+
     public int write(ByteBuffer byteBuffer) throws IOException
     {
-      int bytesToWrite = byteBuffer.remaining();
-      clientChannel.write(byteBuffer);
-      if(!byteBuffer.hasRemaining())
-      {
-        return bytesToWrite;
-      }
-      long startTime = System.currentTimeMillis();
-      long waitTime = getMaxBlockedWriteTimeLimit();
-      if (waitTime <= 0)
-      {
-        // We won't support an infinite time limit, so fall back to using
-        // five minutes, which is a very long timeout given that we're
-        // blocking a worker thread.
-        waitTime = 300000L;
-      }
-
-      long stopTime = startTime + waitTime;
-
-      Selector selector = getWriteSelector();
-      if (selector == null)
-      {
-        // The client connection does not provide a selector, so we'll
-        // fall back to a more inefficient way that will work without a
-        // selector.
-        while (byteBuffer.hasRemaining()
-               && (System.currentTimeMillis() < stopTime))
-        {
-          if (clientChannel.write(byteBuffer) < 0)
-          {
-            // The client connection has been closed.
-            throw new ClosedChannelException();
-          }
-        }
-
-        if (byteBuffer.hasRemaining())
-        {
-          // If we've gotten here, then the write timed out.
-          throw new ClosedChannelException();
-        }
-
-        return bytesToWrite;
-      }
-
-      // Register with the selector for handling write operations.
-      SelectionKey key =
-          clientChannel.register(selector, SelectionKey.OP_WRITE);
-
+      writeLock.lock();
       try
       {
-        selector.select(waitTime);
-        while (byteBuffer.hasRemaining())
+        int bytesToWrite = byteBuffer.remaining();
+        clientChannel.write(byteBuffer);
+        if (!byteBuffer.hasRemaining())
         {
-          long currentTime = System.currentTimeMillis();
-          if (currentTime >= stopTime)
-          {
-            // We've been blocked for too long.
-            throw new ClosedChannelException();
-          }
-          else
-          {
-            waitTime = stopTime - currentTime;
-          }
+          return bytesToWrite;
+        }
 
-          Iterator<SelectionKey> iterator =
-              selector.selectedKeys().iterator();
-          while (iterator.hasNext())
+        long startTime = System.currentTimeMillis();
+        long waitTime = getMaxBlockedWriteTimeLimit();
+        if (waitTime <= 0)
+        {
+          // We won't support an infinite time limit, so fall back to using
+          // five minutes, which is a very long timeout given that we're
+          // blocking a worker thread.
+          waitTime = 300000L;
+        }
+        long stopTime = startTime + waitTime;
+
+        Selector selector = getWriteSelector();
+        if (selector == null)
+        {
+          // The client connection does not provide a selector, so we'll
+          // fall back to a more inefficient way that will work without a
+          // selector.
+          while (byteBuffer.hasRemaining()
+              && (System.currentTimeMillis() < stopTime))
           {
-            SelectionKey k = iterator.next();
-            if (k.isWritable())
+            if (clientChannel.write(byteBuffer) < 0)
             {
-              int bytesWritten = clientChannel.write(byteBuffer);
-              if (bytesWritten < 0)
-              {
-                // The client connection has been closed.
-                throw new ClosedChannelException();
-              }
-
-              iterator.remove();
+              // The client connection has been closed.
+              throw new ClosedChannelException();
             }
           }
 
           if (byteBuffer.hasRemaining())
           {
-            selector.select(waitTime);
+            // If we've gotten here, then the write timed out.
+            throw new ClosedChannelException();
           }
+
+          return bytesToWrite;
         }
 
-        return bytesToWrite;
+        // Register with the selector for handling write operations.
+        SelectionKey key = clientChannel.register(selector,
+            SelectionKey.OP_WRITE);
+        try
+        {
+          selector.select(waitTime);
+          while (byteBuffer.hasRemaining())
+          {
+            long currentTime = System.currentTimeMillis();
+            if (currentTime >= stopTime)
+            {
+              // We've been blocked for too long.
+              throw new ClosedChannelException();
+            }
+            else
+            {
+              waitTime = stopTime - currentTime;
+            }
+
+            Iterator<SelectionKey> iterator = selector.selectedKeys()
+                .iterator();
+            while (iterator.hasNext())
+            {
+              SelectionKey k = iterator.next();
+              if (k.isWritable())
+              {
+                int bytesWritten = clientChannel.write(byteBuffer);
+                if (bytesWritten < 0)
+                {
+                  // The client connection has been closed.
+                  throw new ClosedChannelException();
+                }
+
+                iterator.remove();
+              }
+            }
+
+            if (byteBuffer.hasRemaining())
+            {
+              selector.select(waitTime);
+            }
+          }
+
+          return bytesToWrite;
+        }
+        finally
+        {
+          if (key.isValid())
+          {
+            key.cancel();
+            selector.selectNow();
+          }
+        }
       }
       finally
       {
-        if (key.isValid())
-        {
-          key.cancel();
-          selector.selectNow();
-        }
+        writeLock.unlock();
       }
     }
   }

--
Gitblit v1.10.0