From 2bbfdcf54303286798114aee270531f65c9c3a7c Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Fri, 09 Oct 2009 01:20:46 +0000
Subject: [PATCH] - CMT improvements to parallel request and response processing.
---
opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java | 51 ++++++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java b/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
index 98c6aff..4683e50 100644
--- a/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
+++ b/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
@@ -32,6 +32,7 @@
import java.nio.channels.WritableByteChannel;
import java.io.OutputStream;
import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
/**
* This class is for writing ASN.1 elements directly to an
@@ -50,6 +51,9 @@
// The NIO ByteStringBuilder to write to.
private final ByteBuffer byteBuffer;
+ // The lock to ensure atomic message flush.
+ private final ReentrantLock flushLock;
+
/**
* An adaptor class provides a streaming interface to write to a
* NIO ByteBuffer. This class is also responsible for writing
@@ -65,7 +69,7 @@
if(!byteBuffer.hasRemaining())
{
// No more space left in the buffer, send out to the channel.
- ASN1ByteChannelWriter.this.flush();
+ ASN1ByteChannelWriter.this.lockAndFlush();
}
byteBuffer.put((byte)i);
}
@@ -95,7 +99,8 @@
{
byteBuffer.put(bytes, i + i1 - bytesToWrite, len);
bytesToWrite -= len;
- ASN1ByteChannelWriter.this.flush();
+ // No more space left in the buffer, send out to the channel.
+ ASN1ByteChannelWriter.this.lockAndFlush();
}
else
{
@@ -117,6 +122,7 @@
{
this.byteChannel = byteChannel;
this.byteBuffer = ByteBuffer.allocate(writeBufferSize);
+ this.flushLock = new ReentrantLock(false);
ByteBufferOutputStream bufferStream = new ByteBufferOutputStream();
this.writer = new ASN1OutputStreamWriter(bufferStream);
@@ -298,13 +304,52 @@
/**
* Flush the entire contents of the NIO ByteBuffer out to the
- * channel.
+ * channel. Note that the caller should only invoke this
+ * method when it has finished writing the entire message and
+ * not earlier. Otherwise, calling this method prematurely
+ * while sharing this writer among several threads can cause
+ * messages to interleave. While it is possible to ensure the
+ * safety by either single threaded usage or using external
+ * locking its generally not advised to do so and the caller
+ * should instead obey no flush until the entire message is
+ * written rule when using this writer.
*
* @throws IOException If an error occurs while flushing.
*/
public void flush() throws IOException
{
byteBuffer.flip();
+ try
+ {
+ if (!flushLock.isHeldByCurrentThread())
+ {
+ flushLock.lock();
+ }
+ while(byteBuffer.hasRemaining())
+ {
+ byteChannel.write(byteBuffer);
+ }
+ }
+ finally
+ {
+ flushLock.unlock();
+ }
+ byteBuffer.clear();
+ }
+
+ /**
+ * Take the flush lock and flush the entire contents of
+ * the NIO ByteBuffer out to the channel.
+ *
+ * @throws IOException If an error occurs while flushing.
+ */
+ private void lockAndFlush() throws IOException
+ {
+ byteBuffer.flip();
+ if (!flushLock.isHeldByCurrentThread())
+ {
+ flushLock.lock();
+ }
while(byteBuffer.hasRemaining())
{
byteChannel.write(byteBuffer);
--
Gitblit v1.10.0