From 1e9516a3f84cb1a524c3b4446ef5f7260f6e51d1 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 15 Jun 2012 22:47:49 +0000
Subject: [PATCH] Fix OPENDJ-524: CME in LDAPClientConnection when writing many large responses concurrently to the same connection
---
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java | 172 +++++++++++++++++++++++++++++++--------------------------
1 files changed, 93 insertions(+), 79 deletions(-)
diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
index 8a460d4..6c9dc22 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -52,6 +52,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -154,6 +156,9 @@
*/
private class TimeoutWriteByteChannel implements ByteChannel
{
+ // Synchronize concurrent writes to the same connection.
+ private final Lock writeLock = new ReentrantLock();
+
public int read(ByteBuffer byteBuffer) throws IOException
{
return clientChannel.read(byteBuffer);
@@ -169,104 +174,113 @@
clientChannel.close();
}
+
+
public int write(ByteBuffer byteBuffer) throws IOException
{
- int bytesToWrite = byteBuffer.remaining();
- clientChannel.write(byteBuffer);
- if(!byteBuffer.hasRemaining())
- {
- return bytesToWrite;
- }
- long startTime = System.currentTimeMillis();
- long waitTime = getMaxBlockedWriteTimeLimit();
- if (waitTime <= 0)
- {
- // We won't support an infinite time limit, so fall back to using
- // five minutes, which is a very long timeout given that we're
- // blocking a worker thread.
- waitTime = 300000L;
- }
-
- long stopTime = startTime + waitTime;
-
- Selector selector = getWriteSelector();
- if (selector == null)
- {
- // The client connection does not provide a selector, so we'll
- // fall back to a more inefficient way that will work without a
- // selector.
- while (byteBuffer.hasRemaining()
- && (System.currentTimeMillis() < stopTime))
- {
- if (clientChannel.write(byteBuffer) < 0)
- {
- // The client connection has been closed.
- throw new ClosedChannelException();
- }
- }
-
- if (byteBuffer.hasRemaining())
- {
- // If we've gotten here, then the write timed out.
- throw new ClosedChannelException();
- }
-
- return bytesToWrite;
- }
-
- // Register with the selector for handling write operations.
- SelectionKey key =
- clientChannel.register(selector, SelectionKey.OP_WRITE);
-
+ writeLock.lock();
try
{
- selector.select(waitTime);
- while (byteBuffer.hasRemaining())
+ int bytesToWrite = byteBuffer.remaining();
+ clientChannel.write(byteBuffer);
+ if (!byteBuffer.hasRemaining())
{
- long currentTime = System.currentTimeMillis();
- if (currentTime >= stopTime)
- {
- // We've been blocked for too long.
- throw new ClosedChannelException();
- }
- else
- {
- waitTime = stopTime - currentTime;
- }
+ return bytesToWrite;
+ }
- Iterator<SelectionKey> iterator =
- selector.selectedKeys().iterator();
- while (iterator.hasNext())
+ long startTime = System.currentTimeMillis();
+ long waitTime = getMaxBlockedWriteTimeLimit();
+ if (waitTime <= 0)
+ {
+ // We won't support an infinite time limit, so fall back to using
+ // five minutes, which is a very long timeout given that we're
+ // blocking a worker thread.
+ waitTime = 300000L;
+ }
+ long stopTime = startTime + waitTime;
+
+ Selector selector = getWriteSelector();
+ if (selector == null)
+ {
+ // The client connection does not provide a selector, so we'll
+ // fall back to a more inefficient way that will work without a
+ // selector.
+ while (byteBuffer.hasRemaining()
+ && (System.currentTimeMillis() < stopTime))
{
- SelectionKey k = iterator.next();
- if (k.isWritable())
+ if (clientChannel.write(byteBuffer) < 0)
{
- int bytesWritten = clientChannel.write(byteBuffer);
- if (bytesWritten < 0)
- {
- // The client connection has been closed.
- throw new ClosedChannelException();
- }
-
- iterator.remove();
+ // The client connection has been closed.
+ throw new ClosedChannelException();
}
}
if (byteBuffer.hasRemaining())
{
- selector.select(waitTime);
+ // If we've gotten here, then the write timed out.
+ throw new ClosedChannelException();
}
+
+ return bytesToWrite;
}
- return bytesToWrite;
+ // Register with the selector for handling write operations.
+ SelectionKey key = clientChannel.register(selector,
+ SelectionKey.OP_WRITE);
+ try
+ {
+ selector.select(waitTime);
+ while (byteBuffer.hasRemaining())
+ {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime >= stopTime)
+ {
+ // We've been blocked for too long.
+ throw new ClosedChannelException();
+ }
+ else
+ {
+ waitTime = stopTime - currentTime;
+ }
+
+ Iterator<SelectionKey> iterator = selector.selectedKeys()
+ .iterator();
+ while (iterator.hasNext())
+ {
+ SelectionKey k = iterator.next();
+ if (k.isWritable())
+ {
+ int bytesWritten = clientChannel.write(byteBuffer);
+ if (bytesWritten < 0)
+ {
+ // The client connection has been closed.
+ throw new ClosedChannelException();
+ }
+
+ iterator.remove();
+ }
+ }
+
+ if (byteBuffer.hasRemaining())
+ {
+ selector.select(waitTime);
+ }
+ }
+
+ return bytesToWrite;
+ }
+ finally
+ {
+ if (key.isValid())
+ {
+ key.cancel();
+ selector.selectNow();
+ }
+ }
}
finally
{
- if (key.isValid())
- {
- key.cancel();
- selector.selectNow();
- }
+ writeLock.unlock();
}
}
}
--
Gitblit v1.10.0