mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
13.05.2012 9ff69f265dae8f0647fb9c204fda070eafe25613
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -31,6 +31,7 @@
import static org.opends.messages.CoreMessages.ERR_ENQUEUE_BIND_IN_PROGRESS;
import static org.opends.messages.ProtocolMessages.*;
import static org.opends.server.core.DirectoryServer.getMaxInternalBufferSize;
import static org.opends.server.loggers.AccessLogger.logDisconnect;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
@@ -48,11 +49,9 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -79,10 +78,10 @@
 * connection handler and have its requests decoded by an LDAP request
 * handler.
 */
public class LDAPClientConnection extends ClientConnection implements
public final class LDAPClientConnection extends ClientConnection implements
    TLSCapableConnection
{
  /**
   * A runnable whose task is to close down all IO related channels
   * associated with a client connection after a small delay.
@@ -194,8 +193,8 @@
      if (selector == null)
      {
        // The client connection does not provide a selector, so we'll
        // fall back
        // to a more inefficient way that will work without a selector.
        // fall back to a more inefficient way that will work without a
        // selector.
        while (byteBuffer.hasRemaining()
               && (System.currentTimeMillis() < stopTime))
        {
@@ -278,6 +277,48 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Thread local ASN1Writer and buffer.
   */
  private static final class ASN1WriterHolder
  {
    private final ASN1Writer writer;
    private final ByteStringBuilder buffer;
    private final int maxBufferSize;
    private ASN1WriterHolder()
    {
      this.buffer = new ByteStringBuilder();
      this.maxBufferSize = getMaxInternalBufferSize();
      this.writer = ASN1.getWriter(buffer, maxBufferSize);
    }
  }
  // Cached ASN1 writer: a thread can only write to one connection at a time.
  private static final ThreadLocal<ASN1WriterHolder> ASN1_WRITER_CACHE =
      new ThreadLocal<ASN1WriterHolder>()
  {
    /**
     * {@inheritDoc}
     */
    protected ASN1WriterHolder initialValue()
    {
      return new ASN1WriterHolder();
    }
  };
  private ASN1WriterHolder getASN1Writer()
  {
    ASN1WriterHolder holder = ASN1_WRITER_CACHE.get();
    if (holder.maxBufferSize != getMaxInternalBufferSize())
    {
      // Setting has changed, so recreate the holder.
      holder = new ASN1WriterHolder();
      ASN1_WRITER_CACHE.set(holder);
    }
    return holder;
  }
  // The time that the last operation was completed.
  private final AtomicLong lastCompletionTime;
@@ -321,10 +362,6 @@
  // connection.
  private final LDAPConnectionHandler connectionHandler;
  // The reference to the request handler with which this connection is
  // associated.
  private final LDAPRequestHandler requestHandler;
  // The statistics tracker associated with this client connection.
  private final LDAPStatistics statTracker;
  private boolean useNanoTime=false;
@@ -354,17 +391,12 @@
  // client has connected.
  private final String serverAddress;
  // Holds the mapping between the worker thread and ASN1
  // writer to allow for parallel, non-blocking encoding.
  private Map<Thread,ASN1Writer> asn1WriterMap;
  private ASN1ByteChannelReader asn1Reader;
  private final int bufferSize;
  private final RedirectingByteChannel saslChannel;
  private final RedirectingByteChannel tlsChannel;
  private final ReentrantLock writeLock;
  private volatile ConnectionSecurityProvider activeProvider = null;
  private volatile ConnectionSecurityProvider tlsPendingProvider = null;
  private volatile ConnectionSecurityProvider saslPendingProvider = null;
@@ -381,7 +413,7 @@
   * @param  protocol String representing the protocol (LDAP or LDAP+SSL).
   * @throws DirectoryException If SSL initialisation fails.
   */
  public LDAPClientConnection(LDAPConnectionHandler connectionHandler,
  LDAPClientConnection(LDAPConnectionHandler connectionHandler,
      SocketChannel clientChannel, String protocol) throws DirectoryException
  {
    this.connectionHandler = connectionHandler;
@@ -394,7 +426,6 @@
    timeoutClientChannel = new TimeoutWriteByteChannel();
    opsInProgressLock = new Object();
    ldapVersion = 3;
    requestHandler = null;
    lastCompletionTime = new AtomicLong(TimeThread.getTime());
    nextOperationID = new AtomicLong(0);
    connectionValid = true;
@@ -430,9 +461,6 @@
    this.asn1Reader =
        ASN1.getReader(saslChannel, bufferSize, connectionHandler
            .getMaxRequestSize());
    writeLock = new ReentrantLock();
    asn1WriterMap = new ConcurrentHashMap<Thread,ASN1Writer>();
    if (connectionHandler.useSSL())
    {
@@ -477,21 +505,6 @@
  /**
   * Retrieves the request handler that will read requests for this
   * client connection.
   *
   * @return The request handler that will read requests for this client
   *         connection, or <CODE>null</CODE> if none has been assigned
   *         yet.
   */
  public LDAPRequestHandler getRequestHandler()
  {
    return requestHandler;
  }
  /**
   * Retrieves the socket channel that can be used to communicate with
   * the client.
   *
@@ -632,20 +645,6 @@
  /**
   * Retrieves the next operation ID that should be used for this
   * connection.
   *
   * @return The next operation ID that should be used for this
   *         connection.
   */
  public long nextOperationID()
  {
    return nextOperationID.getAndIncrement();
  }
  /**
   * Sends a response to the client based on the information in the
   * provided operation.
   *
@@ -930,22 +929,14 @@
   * @param message
   *          The LDAP message to send to the client.
   */
  public void sendLDAPMessage(LDAPMessage message)
  private void sendLDAPMessage(LDAPMessage message)
  {
    // Get the writer used by this thread.
    Thread currentThread = Thread.currentThread();
    ASN1Writer asn1Writer = asn1WriterMap.get(currentThread);
    // Use a thread local writer.
    final ASN1WriterHolder holder = getASN1Writer();
    try
    {
      if (asn1Writer == null)
      {
        asn1Writer = ASN1.getWriter(saslChannel, writeLock,
                  bufferSize);
        asn1WriterMap.put(currentThread, asn1Writer);
      }
      message.write(asn1Writer);
      asn1Writer.flush();
      message.write(holder.writer);
      holder.buffer.copyTo(saslChannel);
      if (debugEnabled())
      {
@@ -971,7 +962,21 @@
      disconnect(DisconnectReason.SERVER_ERROR, false, null);
      return;
    }
  }
    finally
    {
      // Clear and reset all of the internal buffers ready for the next usage.
      try
      {
        // The ASN1Writer is based on a ByteStringBuilder so closing will cause
        // the internal buffers to be resized if needed.
        holder.writer.close();
      }
      catch (IOException ignored)
      {
        // Unreachable.
      }
    }
 }
@@ -1187,7 +1192,7 @@
   *           client already has reached the maximum allowed concurrent
   *           requests).
   */
  public void addOperationInProgress(AbstractOperation operation)
  private void addOperationInProgress(AbstractOperation operation)
      throws DirectoryException
  {
    int messageID = operation.getMessageID();
@@ -1539,7 +1544,7 @@
   *
   * @return the ASN1 reader for this connection
   */
  protected ASN1ByteChannelReader getASN1Reader()
  ASN1ByteChannelReader getASN1Reader()
  {
    return asn1Reader;
  }
@@ -1552,7 +1557,7 @@
   * @return number of bytes read if this connection is still valid
   *         or negative integer to indicate an error otherwise
   */
  public int processDataRead()
  int processDataRead()
  {
    if (bindOrStartTLSInProgress.get())
    {
@@ -1617,7 +1622,7 @@
   *         error and the client has been disconnected as a result, or
   *         if the client unbound from the server.
   */
  public boolean processLDAPMessage(LDAPMessage message)
  boolean processLDAPMessage(LDAPMessage message)
  {
    if (keepStats)
    {
@@ -2580,7 +2585,7 @@
  /**
   * Enable the provider that is inactive.
   */
  public void enableTLS()
  private void enableTLS()
  {
    activeProvider = tlsPendingProvider;
    tlsChannel.redirect(tlsPendingProvider);
@@ -2595,7 +2600,7 @@
   * @param sslProvider
   *          The provider to set the security provider to.
   */
  public void enableSSL(ConnectionSecurityProvider sslProvider)
  private void enableSSL(ConnectionSecurityProvider sslProvider)
  {
    activeProvider = sslProvider;
    tlsChannel.redirect(sslProvider);
@@ -2606,7 +2611,7 @@
  /**
   * Enable the SASL provider that is currently inactive or pending.
   */
  public void enableSASL()
  private void enableSASL()
  {
    activeProvider = saslPendingProvider;
    saslChannel.redirect(saslPendingProvider);