From 22094368c2865dcfb6daf8366425212b721a4657 Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Thu, 05 Feb 2009 17:42:14 +0000
Subject: [PATCH] Merge ASN1 branch to trunk
---
opends/src/server/org/opends/server/protocols/internal/InternalLDAPInputStream.java | 180 +++++++++++++++++++++++++----------------------------------
1 files changed, 76 insertions(+), 104 deletions(-)
diff --git a/opends/src/server/org/opends/server/protocols/internal/InternalLDAPInputStream.java b/opends/src/server/org/opends/server/protocols/internal/InternalLDAPInputStream.java
index a8104ec..f2ab4f5 100644
--- a/opends/src/server/org/opends/server/protocols/internal/InternalLDAPInputStream.java
+++ b/opends/src/server/org/opends/server/protocols/internal/InternalLDAPInputStream.java
@@ -28,13 +28,15 @@
-import java.io.InputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.io.InputStream;
import java.util.concurrent.ArrayBlockingQueue;
+import org.opends.server.protocols.asn1.ASN1;
+import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.protocols.ldap.LDAPMessage;
-
+import org.opends.server.types.ByteSequenceReader;
+import org.opends.server.types.ByteStringBuilder;
/**
@@ -54,16 +56,22 @@
{
// The queue of LDAP messages providing the data to be made
// available to the client.
- private ArrayBlockingQueue<LDAPMessage> messageQueue;
+ private final ArrayBlockingQueue<LDAPMessage> messageQueue;
// Indicates whether this stream has been closed.
private boolean closed;
// The byte buffer with partial data to be written to the client.
- private ByteBuffer partialMessageBuffer;
+ private final ByteStringBuilder messageBuffer;
+
+ // The byte buffer reader.
+ private final ByteSequenceReader messageReader;
+
+ // The byte buffer writer.
+ private final ASN1Writer writer;
// The internal LDAP socket serviced by this input stream.
- private InternalLDAPSocket socket;
+ private final InternalLDAPSocket socket;
@@ -77,10 +85,11 @@
public InternalLDAPInputStream(InternalLDAPSocket socket)
{
this.socket = socket;
-
- messageQueue = new ArrayBlockingQueue<LDAPMessage>(10);
- partialMessageBuffer = null;
- closed = false;
+ this.messageQueue = new ArrayBlockingQueue<LDAPMessage>(10);
+ this.messageBuffer = new ByteStringBuilder();
+ this.messageReader = messageBuffer.asReader();
+ this.writer = ASN1.getWriter(messageBuffer);
+ this.closed = false;
}
@@ -136,17 +145,19 @@
*
* @return The number of bytes that can be read (or skipped over)
* from this input stream wihtout blocking.
+ * @throws IOException if an I/O error occurs.
*/
@Override()
- public synchronized int available()
+ public synchronized int available() throws IOException
{
- if (partialMessageBuffer == null)
+ if (messageReader.remaining() < 1)
{
LDAPMessage message = messageQueue.poll();
if ((message == null) || (message instanceof NullLDAPMessage))
{
- if (message instanceof NullLDAPMessage)
+ if (message != null)
{
+ messageQueue.clear();
closed = true;
}
@@ -154,15 +165,13 @@
}
else
{
- partialMessageBuffer =
- ByteBuffer.wrap(message.encode().encode());
- return partialMessageBuffer.remaining();
+ messageBuffer.clear();
+ messageReader.rewind();
+ message.write(writer);
}
}
- else
- {
- return partialMessageBuffer.remaining();
- }
+
+ return messageReader.remaining();
}
@@ -256,47 +265,39 @@
public synchronized int read()
throws IOException
{
- if (partialMessageBuffer != null)
+ if (messageReader.remaining() < 1)
{
- if (partialMessageBuffer.remaining() > 0)
+ LDAPMessage message;
+ try
{
- int i = (0xFF & partialMessageBuffer.get());
- if (partialMessageBuffer.remaining() == 0)
+ message = messageQueue.take();
+ }
+ catch(InterruptedException ie)
+ {
+ // Probably because a shutdown was started. EOF
+ message = new NullLDAPMessage();
+ }
+
+ if ((message == null) || (message instanceof NullLDAPMessage))
+ {
+ if (message instanceof NullLDAPMessage)
{
- partialMessageBuffer = null;
+ messageQueue.clear();
+ closed = true;
+ return -1;
}
- return i;
+ return 0;
}
else
{
- partialMessageBuffer = null;
+ messageBuffer.clear();
+ messageReader.rewind();
+ message.write(writer);
}
}
- if (closed)
- {
- return -1;
- }
-
- try
- {
- LDAPMessage message = messageQueue.take();
- if (message instanceof NullLDAPMessage)
- {
- messageQueue.clear();
- closed = true;
- return -1;
- }
-
- partialMessageBuffer =
- ByteBuffer.wrap(message.encode().encode());
- return (0xFF & partialMessageBuffer.get());
- }
- catch (Exception e)
- {
- throw new IOException(e.getMessage());
- }
+ return (0xFF & messageReader.get());
}
@@ -346,70 +347,41 @@
public synchronized int read(byte[] b, int off, int len)
throws IOException
{
- if (partialMessageBuffer != null)
+ if (messageReader.remaining() < 1)
{
- int remaining = partialMessageBuffer.remaining();
- if (remaining > 0)
+ LDAPMessage message;
+ try
{
- if (remaining <= len)
+ message = messageQueue.take();
+ }
+ catch(InterruptedException ie)
+ {
+ // Probably because a shutdown was started. EOF
+ message = new NullLDAPMessage();
+ }
+
+ if ((message == null) || (message instanceof NullLDAPMessage))
+ {
+ if (message instanceof NullLDAPMessage)
{
- // We can fit all the remaining data in the provided array,
- // so that's all we'll try to put in it.
- partialMessageBuffer.get(b, off, remaining);
- partialMessageBuffer = null;
- return remaining;
+ messageQueue.clear();
+ closed = true;
+ return -1;
}
- else
- {
- // The array is too small to hold the rest of the data, so
- // only take as much as we can.
- partialMessageBuffer.get(b, off, len);
- return len;
- }
+
+ return 0;
}
else
{
- partialMessageBuffer = null;
+ messageBuffer.clear();
+ messageReader.rewind();
+ message.write(writer);
}
}
- if (closed)
- {
- return -1;
- }
-
- try
- {
- LDAPMessage message = messageQueue.take();
- if (message instanceof NullLDAPMessage)
- {
- messageQueue.clear();
- closed = true;
- return -1;
- }
-
- byte[] encodedMessage = message.encode().encode();
- if (encodedMessage.length <= len)
- {
- // We can fit the entire message in the array.
- System.arraycopy(encodedMessage, 0, b, off,
- encodedMessage.length);
- return encodedMessage.length;
- }
- else
- {
- // We can only fit part of the message in the array,
- // so we need to save the rest for later.
- System.arraycopy(encodedMessage, 0, b, off, len);
- partialMessageBuffer = ByteBuffer.wrap(encodedMessage);
- partialMessageBuffer.position(len);
- return len;
- }
- }
- catch (Exception e)
- {
- throw new IOException(e.getMessage());
- }
+ int actualLen = Math.min(len, messageReader.remaining());
+ messageReader.get(b, off, actualLen);
+ return actualLen;
}
--
Gitblit v1.10.0