From 2bbfdcf54303286798114aee270531f65c9c3a7c Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Fri, 09 Oct 2009 01:20:46 +0000
Subject: [PATCH] - CMT improvements to parallel request and response processing.
---
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java | 195 +++++++++++++++++++++---------------------------
1 files changed, 86 insertions(+), 109 deletions(-)
diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
index b14755b..f808e41 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -181,10 +181,7 @@
// The number of operations performed on this connection.
// Used to compare with the resource limits of the network group.
- private long operationsPerformed;
-
- // Lock on the number of operations
- private final Object operationsPerformedLock;
+ private final AtomicLong operationsPerformed;
// The port on the client from which this connection originated.
private final int clientPort;
@@ -216,10 +213,6 @@
// in progress.
private final Object opsInProgressLock;
- // The lock used to provide threadsafe access when sending data to the
- // client.
- private final Object transmitLock;
-
// The socket channel with which this client connection is associated.
private final SocketChannel clientChannel;
@@ -236,9 +229,7 @@
private ASN1ByteChannelReader asn1Reader;
- private ASN1Writer asn1Writer;
-
- private static int APPLICATION_BUFFER_SIZE = 4096;
+ private int APPLICATION_BUFFER_SIZE = 4096;
private final RedirectingByteChannel saslChannel;
private final RedirectingByteChannel tlsChannel;
@@ -246,6 +237,28 @@
private volatile ConnectionSecurityProvider tlsPendingProvider = null;
private volatile ConnectionSecurityProvider saslPendingProvider = null;
+ /**
+ * Thread local storage to provide each worker thread with its
+ * own ASN1Writer to allow for parallel, non-blocking encoding.
+ */
+ private final ThreadLocal<ASN1Writer> threadLocalASN1Writer =
+ new ThreadLocal<ASN1Writer>()
+ {
+ @Override
+ protected ASN1Writer initialValue()
+ {
+ if (isSecure())
+ {
+ int appBufSize = activeProvider.getAppBufSize();
+ return ASN1.getWriter(saslChannel, appBufSize);
+ }
+ else
+ {
+ return ASN1.getWriter(saslChannel,
+ APPLICATION_BUFFER_SIZE);
+ }
+ }
+ };
/**
* Creates a new LDAP client connection with the provided information.
@@ -270,7 +283,6 @@
this.clientChannel = clientChannel;
opsInProgressLock = new Object();
- transmitLock = new Object();
ldapVersion = 3;
requestHandler = null;
lastCompletionTime = new AtomicLong(TimeThread.getTime());
@@ -278,8 +290,7 @@
connectionValid = true;
disconnectRequested = false;
operationsInProgress = new ConcurrentHashMap<Integer, Operation>();
- operationsPerformed = 0;
- operationsPerformedLock = new Object();
+ operationsPerformed = new AtomicLong(0);
keepStats = connectionHandler.keepStats();
this.protocol = protocol;
writeSelector = new AtomicReference<Selector>();
@@ -299,6 +310,8 @@
this.useNanoTime=DirectoryServer.getUseNanoTime();
}
+ APPLICATION_BUFFER_SIZE = connectionHandler.getBufferSize();
+
tlsChannel =
RedirectingByteChannel.getRedirectingByteChannel(clientChannel);
saslChannel =
@@ -794,42 +807,24 @@
*/
public void sendLDAPMessage(LDAPMessage message)
{
- // Get the buffer used by this thread.
+ // Get the writer used by this thread.
+ ASN1Writer asn1Writer = threadLocalASN1Writer.get();
try
{
- // Make sure that we can only send one message at a time. This
- // locking will not have any impact on the ability to read
- // requests from the client.
- synchronized (transmitLock)
+ message.write(asn1Writer);
+ asn1Writer.flush();
+
+ if (debugEnabled())
{
- if (asn1Writer == null)
- {
- if (isSecure())
- {
- int appBufSize = activeProvider.getAppBufSize();
- asn1Writer = ASN1.getWriter(saslChannel, appBufSize);
- }
- else
- {
- asn1Writer =
- ASN1.getWriter(saslChannel, APPLICATION_BUFFER_SIZE);
- }
- }
+ TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
+ message.toString());
+ }
- message.write(asn1Writer);
- asn1Writer.flush();
- if(debugEnabled())
- {
- TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
- message.toString());
- }
-
- if (keepStats)
- {
- // TODO hard-coded for now, flush probably needs to
- // return how many bytes were flushed.
- statTracker.updateMessageWritten(message, 4096);
- }
+ if (keepStats)
+ {
+ // TODO hard-coded for now, flush probably needs to
+ // return how many bytes were flushed.
+ statTracker.updateMessageWritten(message, 4096);
}
}
catch (Exception e)
@@ -1078,9 +1073,12 @@
message);
}
+ // Add the operation to the list of operations in progress for
+ // this connection.
+ Operation op = operationsInProgress.putIfAbsent(messageID, operation);
+
// See if there is already an operation in progress with the
// same message ID. If so, then we can't allow it.
- Operation op = operationsInProgress.get(messageID);
if (op != null)
{
Message message =
@@ -1089,10 +1087,6 @@
message);
}
- // Add the operation to the list of operations in progress for
- // this connection.
- operationsInProgress.put(messageID, operation);
-
// Try to add the operation to the work queue,
// or run it synchronously (typically for the administration
// connector)
@@ -1399,12 +1393,19 @@
@Override
public long getNumberOfOperations()
{
- long tmpNumberOfOperations;
- synchronized (operationsPerformedLock)
- {
- tmpNumberOfOperations = operationsPerformed;
- }
- return tmpNumberOfOperations;
+ return operationsPerformed.get();
+ }
+
+
+
+ /**
+ * Returns the ASN1 reader for this connection.
+ *
+ * @return the ASN1 reader for this connection
+ */
+ protected ASN1ByteChannelReader getASN1Reader()
+ {
+ return asn1Reader;
}
@@ -1412,61 +1413,40 @@
/**
* Process data read.
*
- * @return {@code true} if this connection is still valid.
+ * @return number of bytes read if this connection is still valid
+ * or negative integer to indicate an error otherwise
*/
- public boolean processDataRead()
+ public int processDataRead()
{
- while (true)
+ if (bindOrStartTLSInProgress.get())
{
- if(bindOrStartTLSInProgress.get())
- {
- // We should wait for the bind or startTLS to finish before
- // reading any more data off the socket.
- return true;
- }
+ // We should wait for the bind or startTLS to finish before
+ // reading any more data off the socket.
+ return 0;
+ }
- try
+ try
+ {
+ int result = asn1Reader.processChannelData();
+ if (result < 0)
{
- int result = asn1Reader.processChannelData();
- if (result < 0)
- {
- // The connection has been closed by the client. Disconnect
- // and return.
- disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
- return false;
- }
- if (result == 0)
- {
- // We have read all the data that there is to read right now
- // (or there wasn't any in the first place). Just return and
- // wait for future notification.
- return true;
- }
- else
- {
- // Decode all complete elements from the last read
- while (asn1Reader.elementAvailable())
- {
- if (!processLDAPMessage(LDAPReader.readMessage(asn1Reader)))
- {
- // Fatal connection error - client disconnected.
- return false;
- }
- }
- }
+ // The connection has been closed by the client. Disconnect
+ // and return.
+ disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
+ return -1;
}
- catch (Exception e)
+ return result;
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
{
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- Message m =
- ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String
- .valueOf(e));
- disconnect(DisconnectReason.PROTOCOL_ERROR, false, m);
- return false;
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
+ Message m =
+ ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String.valueOf(e));
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, m);
+ return -1;
}
}
@@ -1485,17 +1465,14 @@
* error and the client has been disconnected as a result, or
* if the client unbound from the server.
*/
- private boolean processLDAPMessage(LDAPMessage message)
+ public boolean processLDAPMessage(LDAPMessage message)
{
if (keepStats)
{
statTracker.updateMessageRead(message);
this.getNetworkGroup().updateMessageRead(message);
}
- synchronized (operationsPerformedLock)
- {
- operationsPerformed++;
- }
+ operationsPerformed.getAndIncrement();
List<Control> opControls = message.getControls();
--
Gitblit v1.10.0