From 9ff69f265dae8f0647fb9c204fda070eafe25613 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 13 Jun 2012 21:05:11 +0000
Subject: [PATCH] Fix OPENDJ-520: Worker threads are too greedy when caching memory used for encoding/decoding entries and protocol messages
---
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java | 147 +++++++++++++++++++++++++-----------------------
1 files changed, 76 insertions(+), 71 deletions(-)
diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
index 3af5e14..8a460d4 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -31,6 +31,7 @@
import static org.opends.messages.CoreMessages.ERR_ENQUEUE_BIND_IN_PROGRESS;
import static org.opends.messages.ProtocolMessages.*;
+import static org.opends.server.core.DirectoryServer.getMaxInternalBufferSize;
import static org.opends.server.loggers.AccessLogger.logDisconnect;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
@@ -48,11 +49,9 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -79,10 +78,10 @@
* connection handler and have its requests decoded by an LDAP request
* handler.
*/
-
-public class LDAPClientConnection extends ClientConnection implements
+public final class LDAPClientConnection extends ClientConnection implements
TLSCapableConnection
{
+
/**
* A runnable whose task is to close down all IO related channels
* associated with a client connection after a small delay.
@@ -194,8 +193,8 @@
if (selector == null)
{
// The client connection does not provide a selector, so we'll
- // fall back
- // to a more inefficient way that will work without a selector.
+ // fall back to a more inefficient way that will work without a
+ // selector.
while (byteBuffer.hasRemaining()
&& (System.currentTimeMillis() < stopTime))
{
@@ -278,6 +277,48 @@
*/
private static final DebugTracer TRACER = getTracer();
+ /**
+ * Thread local ASN1Writer and buffer.
+ */
+ private static final class ASN1WriterHolder
+ {
+ private final ASN1Writer writer;
+ private final ByteStringBuilder buffer;
+ private final int maxBufferSize;
+
+ private ASN1WriterHolder()
+ {
+ this.buffer = new ByteStringBuilder();
+ this.maxBufferSize = getMaxInternalBufferSize();
+ this.writer = ASN1.getWriter(buffer, maxBufferSize);
+ }
+ }
+
+ // Cached ASN1 writer: a thread can only write to one connection at a time.
+ private static final ThreadLocal<ASN1WriterHolder> ASN1_WRITER_CACHE =
+ new ThreadLocal<ASN1WriterHolder>()
+ {
+ /**
+ * {@inheritDoc}
+ */
+ protected ASN1WriterHolder initialValue()
+ {
+ return new ASN1WriterHolder();
+ }
+ };
+
+ private ASN1WriterHolder getASN1Writer()
+ {
+ ASN1WriterHolder holder = ASN1_WRITER_CACHE.get();
+ if (holder.maxBufferSize != getMaxInternalBufferSize())
+ {
+ // Setting has changed, so recreate the holder.
+ holder = new ASN1WriterHolder();
+ ASN1_WRITER_CACHE.set(holder);
+ }
+ return holder;
+ }
+
// The time that the last operation was completed.
private final AtomicLong lastCompletionTime;
@@ -321,10 +362,6 @@
// connection.
private final LDAPConnectionHandler connectionHandler;
- // The reference to the request handler with which this connection is
- // associated.
- private final LDAPRequestHandler requestHandler;
-
// The statistics tracker associated with this client connection.
private final LDAPStatistics statTracker;
private boolean useNanoTime=false;
@@ -354,17 +391,12 @@
// client has connected.
private final String serverAddress;
- // Holds the mapping between the worker thread and ASN1
- // writer to allow for parallel, non-blocking encoding.
- private Map<Thread,ASN1Writer> asn1WriterMap;
+
private ASN1ByteChannelReader asn1Reader;
-
private final int bufferSize;
-
private final RedirectingByteChannel saslChannel;
private final RedirectingByteChannel tlsChannel;
- private final ReentrantLock writeLock;
private volatile ConnectionSecurityProvider activeProvider = null;
private volatile ConnectionSecurityProvider tlsPendingProvider = null;
private volatile ConnectionSecurityProvider saslPendingProvider = null;
@@ -381,7 +413,7 @@
* @param protocol String representing the protocol (LDAP or LDAP+SSL).
* @throws DirectoryException If SSL initialisation fails.
*/
- public LDAPClientConnection(LDAPConnectionHandler connectionHandler,
+ LDAPClientConnection(LDAPConnectionHandler connectionHandler,
SocketChannel clientChannel, String protocol) throws DirectoryException
{
this.connectionHandler = connectionHandler;
@@ -394,7 +426,6 @@
timeoutClientChannel = new TimeoutWriteByteChannel();
opsInProgressLock = new Object();
ldapVersion = 3;
- requestHandler = null;
lastCompletionTime = new AtomicLong(TimeThread.getTime());
nextOperationID = new AtomicLong(0);
connectionValid = true;
@@ -430,9 +461,6 @@
this.asn1Reader =
ASN1.getReader(saslChannel, bufferSize, connectionHandler
.getMaxRequestSize());
- writeLock = new ReentrantLock();
-
- asn1WriterMap = new ConcurrentHashMap<Thread,ASN1Writer>();
if (connectionHandler.useSSL())
{
@@ -477,21 +505,6 @@
/**
- * Retrieves the request handler that will read requests for this
- * client connection.
- *
- * @return The request handler that will read requests for this client
- * connection, or <CODE>null</CODE> if none has been assigned
- * yet.
- */
- public LDAPRequestHandler getRequestHandler()
- {
- return requestHandler;
- }
-
-
-
- /**
* Retrieves the socket channel that can be used to communicate with
* the client.
*
@@ -632,20 +645,6 @@
/**
- * Retrieves the next operation ID that should be used for this
- * connection.
- *
- * @return The next operation ID that should be used for this
- * connection.
- */
- public long nextOperationID()
- {
- return nextOperationID.getAndIncrement();
- }
-
-
-
- /**
* Sends a response to the client based on the information in the
* provided operation.
*
@@ -930,22 +929,14 @@
* @param message
* The LDAP message to send to the client.
*/
- public void sendLDAPMessage(LDAPMessage message)
+ private void sendLDAPMessage(LDAPMessage message)
{
- // Get the writer used by this thread.
- Thread currentThread = Thread.currentThread();
- ASN1Writer asn1Writer = asn1WriterMap.get(currentThread);
+ // Use a thread local writer.
+ final ASN1WriterHolder holder = getASN1Writer();
try
{
- if (asn1Writer == null)
- {
- asn1Writer = ASN1.getWriter(saslChannel, writeLock,
- bufferSize);
- asn1WriterMap.put(currentThread, asn1Writer);
- }
-
- message.write(asn1Writer);
- asn1Writer.flush();
+ message.write(holder.writer);
+ holder.buffer.copyTo(saslChannel);
if (debugEnabled())
{
@@ -971,7 +962,21 @@
disconnect(DisconnectReason.SERVER_ERROR, false, null);
return;
}
- }
+ finally
+ {
+ // Clear and reset all of the internal buffers ready for the next usage.
+ try
+ {
+ // The ASN1Writer is based on a ByteStringBuilder so closing will cause
+ // the internal buffers to be resized if needed.
+ holder.writer.close();
+ }
+ catch (IOException ignored)
+ {
+ // Unreachable.
+ }
+ }
+ }
@@ -1187,7 +1192,7 @@
* client already has reached the maximum allowed concurrent
* requests).
*/
- public void addOperationInProgress(AbstractOperation operation)
+ private void addOperationInProgress(AbstractOperation operation)
throws DirectoryException
{
int messageID = operation.getMessageID();
@@ -1539,7 +1544,7 @@
*
* @return the ASN1 reader for this connection
*/
- protected ASN1ByteChannelReader getASN1Reader()
+ ASN1ByteChannelReader getASN1Reader()
{
return asn1Reader;
}
@@ -1552,7 +1557,7 @@
* @return number of bytes read if this connection is still valid
* or negative integer to indicate an error otherwise
*/
- public int processDataRead()
+ int processDataRead()
{
if (bindOrStartTLSInProgress.get())
{
@@ -1617,7 +1622,7 @@
* error and the client has been disconnected as a result, or
* if the client unbound from the server.
*/
- public boolean processLDAPMessage(LDAPMessage message)
+ boolean processLDAPMessage(LDAPMessage message)
{
if (keepStats)
{
@@ -2580,7 +2585,7 @@
/**
* Enable the provider that is inactive.
*/
- public void enableTLS()
+ private void enableTLS()
{
activeProvider = tlsPendingProvider;
tlsChannel.redirect(tlsPendingProvider);
@@ -2595,7 +2600,7 @@
* @param sslProvider
* The provider to set the security provider to.
*/
- public void enableSSL(ConnectionSecurityProvider sslProvider)
+ private void enableSSL(ConnectionSecurityProvider sslProvider)
{
activeProvider = sslProvider;
tlsChannel.redirect(sslProvider);
@@ -2606,7 +2611,7 @@
/**
* Enable the SASL provider that is currently inactive or pending.
*/
- public void enableSASL()
+ private void enableSASL()
{
activeProvider = saslPendingProvider;
saslChannel.redirect(saslPendingProvider);
--
Gitblit v1.10.0