From 2bbfdcf54303286798114aee270531f65c9c3a7c Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Fri, 09 Oct 2009 01:20:46 +0000
Subject: [PATCH] - CMT improvements to parallel request and response processing.

---
 opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java |   51 ++++++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java b/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
index 98c6aff..4683e50 100644
--- a/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
+++ b/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
@@ -32,6 +32,7 @@
 import java.nio.channels.WritableByteChannel;
 import java.io.OutputStream;
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This class is for writing ASN.1 elements directly to an
@@ -50,6 +51,9 @@
   // The NIO ByteStringBuilder to write to.
   private final ByteBuffer byteBuffer;
 
+  // The lock to ensure atomic message flush.
+  private final ReentrantLock flushLock;
+
   /**
    * An adaptor class provides a streaming interface to write to a
    * NIO ByteBuffer. This class is also responsible for writing
@@ -65,7 +69,7 @@
       if(!byteBuffer.hasRemaining())
       {
         // No more space left in the buffer, send out to the channel.
-        ASN1ByteChannelWriter.this.flush();
+        ASN1ByteChannelWriter.this.lockAndFlush();
       }
       byteBuffer.put((byte)i);
     }
@@ -95,7 +99,8 @@
         {
           byteBuffer.put(bytes, i + i1 - bytesToWrite, len);
           bytesToWrite -= len;
-          ASN1ByteChannelWriter.this.flush();
+          // No more space left in the buffer, send out to the channel.
+          ASN1ByteChannelWriter.this.lockAndFlush();
         }
         else
         {
@@ -117,6 +122,7 @@
   {
     this.byteChannel = byteChannel;
     this.byteBuffer = ByteBuffer.allocate(writeBufferSize);
+    this.flushLock = new ReentrantLock(false);
 
     ByteBufferOutputStream bufferStream = new ByteBufferOutputStream();
     this.writer = new ASN1OutputStreamWriter(bufferStream);
@@ -298,13 +304,52 @@
 
   /**
    * Flush the entire contents of the NIO ByteBuffer out to the
-   * channel.
+   * channel.  Note that the caller should only invoke this
+   * method when it has finished writing the entire message and
+   * not earlier. Otherwise, calling this method prematurely
+   * while sharing this writer among several threads can cause
+   * messages to interleave. While it is possible to ensure the
+   * safety by either single threaded usage or using external
+   * locking its generally not advised to do so and the caller
+   * should instead obey no flush until the entire message is
+   * written rule when using this writer.
    *
    * @throws IOException If an error occurs while flushing.
    */
   public void flush() throws IOException
   {
     byteBuffer.flip();
+    try
+    {
+      if (!flushLock.isHeldByCurrentThread())
+      {
+        flushLock.lock();
+      }
+      while(byteBuffer.hasRemaining())
+      {
+        byteChannel.write(byteBuffer);
+      }
+    }
+    finally
+    {
+      flushLock.unlock();
+    }
+    byteBuffer.clear();
+  }
+
+  /**
+   * Take the flush lock and flush the entire contents of
+   * the NIO ByteBuffer out to the channel.
+   *
+   * @throws IOException If an error occurs while flushing.
+   */
+  private void lockAndFlush() throws IOException
+  {
+    byteBuffer.flip();
+    if (!flushLock.isHeldByCurrentThread())
+    {
+      flushLock.lock();
+    }
     while(byteBuffer.hasRemaining())
     {
       byteChannel.write(byteBuffer);

--
Gitblit v1.10.0