From 2bbfdcf54303286798114aee270531f65c9c3a7c Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Fri, 09 Oct 2009 01:20:46 +0000
Subject: [PATCH] - CMT improvements to parallel request and response processing.
---
opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java | 145 +++++++++++++++++++++++++++++++----------------
1 files changed, 95 insertions(+), 50 deletions(-)
diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
index 0301ec4..528679e 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
@@ -40,15 +40,16 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+
import java.util.LinkedList;
import java.util.List;
-
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.ServerShutdownListener;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.ErrorLogger;
import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.protocols.asn1.ASN1ByteChannelReader;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DisconnectReason;
import org.opends.server.types.InitializationException;
@@ -71,16 +72,6 @@
*/
private static final DebugTracer TRACER = getTracer();
-
-
-
- /**
- * The buffer size in bytes to use when reading data from a client.
- */
- public static final int BUFFER_SIZE = 8192;
-
-
-
// Indicates whether the Directory Server is in the process of shutting down.
private volatile boolean shutdownRequested = false;
@@ -89,12 +80,17 @@
// The queue that will be used to hold the set of pending connections that
// need to be registered with the selector.
+ // TODO: revisit, see Issue 4202.
private List<LDAPClientConnection> pendingConnections =
new LinkedList<LDAPClientConnection>();
// Lock object for synchronizing access to the pending connections queue.
private final Object pendingConnectionsLock = new Object();
+ // The list of connections ready for request processing.
+ private LinkedList<LDAPClientConnection> readyConnections =
+ new LinkedList<LDAPClientConnection>();
+
// The selector that will be used to monitor the client connections.
private final Selector selector;
@@ -180,6 +176,89 @@
// loop, check for new requests, then check for new connections.
while (!shutdownRequested)
{
+ LDAPClientConnection readyConnection = null;
+ while ((readyConnection = readyConnections.poll()) != null)
+ {
+ try
+ {
+ ASN1ByteChannelReader asn1Reader = readyConnection.getASN1Reader();
+ boolean ldapMessageProcessed = false;
+ while (true)
+ {
+ if (asn1Reader.elementAvailable())
+ {
+ if (!ldapMessageProcessed)
+ {
+ if (readyConnection.processLDAPMessage(
+ LDAPReader.readMessage(asn1Reader)))
+ {
+ ldapMessageProcessed = true;
+ }
+ else
+ {
+ break;
+ }
+ }
+ else
+ {
+ readyConnections.add(readyConnection);
+ break;
+ }
+ }
+ else
+ {
+ if (readyConnection.processDataRead() <= 0)
+ {
+ break;
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ }
+
+ // Check to see if we have any pending connections that need to be
+ // registered with the selector.
+ List<LDAPClientConnection> tmp = null;
+ synchronized (pendingConnectionsLock)
+ {
+ if (!pendingConnections.isEmpty())
+ {
+ tmp = pendingConnections;
+ pendingConnections = new LinkedList<LDAPClientConnection>();
+ }
+ }
+
+ if (tmp != null)
+ {
+ for (LDAPClientConnection c : tmp)
+ {
+ try
+ {
+ SocketChannel socketChannel = c.getSocketChannel();
+ socketChannel.configureBlocking(false);
+ socketChannel.register(selector, SelectionKey.OP_READ, c);
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+
+ c.disconnect(DisconnectReason.SERVER_ERROR, true,
+ ERR_LDAP_REQHANDLER_CANNOT_REGISTER.get(handlerName,
+ String.valueOf(e)));
+ }
+ }
+ }
+
// Create a copy of the selection keys which can be used in a
// thread-safe manner by getClientConnections. This copy is only
// updated once per loop, so may not be accurate.
@@ -226,10 +305,14 @@
try
{
- if (! clientConnection.processDataRead())
+ int readResult = clientConnection.processDataRead();
+ if (readResult < 0)
{
key.cancel();
}
+ if (readResult > 0) {
+ readyConnections.add(clientConnection);
+ }
}
catch (Exception e)
{
@@ -307,43 +390,6 @@
}
}
}
-
-
- // Check to see if we have any pending connections that need to be
- // registered with the selector.
- List<LDAPClientConnection> tmp = null;
- synchronized (pendingConnectionsLock)
- {
- if (!pendingConnections.isEmpty())
- {
- tmp = pendingConnections;
- pendingConnections = new LinkedList<LDAPClientConnection>();
- }
- }
-
- if (tmp != null)
- {
- for (LDAPClientConnection c : tmp)
- {
- try
- {
- SocketChannel socketChannel = c.getSocketChannel();
- socketChannel.configureBlocking(false);
- socketChannel.register(selector, SelectionKey.OP_READ, c);
- }
- catch (Exception e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
-
- c.disconnect(DisconnectReason.SERVER_ERROR, true,
- ERR_LDAP_REQHANDLER_CANNOT_REGISTER.get(handlerName,
- String.valueOf(e)));
- }
- }
- }
}
// Disconnect all active connections.
@@ -439,7 +485,6 @@
return false;
}
-
// Try to add the new connection to the queue. If it succeeds, then wake
// up the selector so it will be picked up right away. Otherwise,
// disconnect the client.
--
Gitblit v1.10.0