mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

abobrov
09.20.2009 2bbfdcf54303286798114aee270531f65c9c3a7c
- CMT improvements to parallel request and response processing.
9 files modified
449 ■■■■■ changed files
opends/resource/config/config.ldif 2 ●●●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif 3 ●●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/LDAPConnectionHandlerConfiguration.xml 25 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/admin/AdministrationConnector.java 14 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/controls/EntryChangelogNotificationControl.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java 51 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java 195 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java 145 ●●●●● patch | view | raw | blame | history
opends/resource/config/config.ldif
@@ -428,6 +428,7 @@
ds-cfg-allow-tcp-reuse-address: true
ds-cfg-send-rejection-notice: true
ds-cfg-max-request-size: 5 megabytes
ds-cfg-buffer-size: 4096 bytes
ds-cfg-max-blocked-write-time-limit: 2 minutes
ds-cfg-num-request-handlers: 2
ds-cfg-allow-start-tls: false
@@ -452,6 +453,7 @@
ds-cfg-allow-tcp-reuse-address: true
ds-cfg-send-rejection-notice: true
ds-cfg-max-request-size: 5 megabytes
ds-cfg-buffer-size: 4096 bytes
ds-cfg-max-blocked-write-time-limit: 2 minutes
ds-cfg-num-request-handlers: 2
ds-cfg-allow-start-tls: false
opends/resource/schema/02-config.ldif
@@ -2601,7 +2601,8 @@
        ds-cfg-trust-manager-provider $
        ds-cfg-ssl-protocol $
        ds-cfg-ssl-cipher-suite $
        ds-cfg-max-blocked-write-time-limit )
        ds-cfg-max-blocked-write-time-limit $
        ds-cfg-buffer-size )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.14
  NAME 'ds-cfg-entry-cache'
opends/src/admin/defn/org/opends/server/admin/std/LDAPConnectionHandlerConfiguration.xml
@@ -23,7 +23,7 @@
  ! CDDL HEADER END
  !
  !
  !      Copyright 2007-2008 Sun Microsystems, Inc.
  !      Copyright 2007-2009 Sun Microsystems, Inc.
  ! -->
<adm:managed-object name="ldap-connection-handler"
  plural-name="ldap-connection-handlers"
@@ -438,6 +438,29 @@
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="buffer-size" advanced="true">
    <adm:synopsis>
      Specifies the size in bytes of the LDAP response message write buffer.
    </adm:synopsis>
    <adm:description>
      This property specifies write buffer size allocated by the server for
      each client connection and used to buffer LDAP response messages data
      when writing.
    </adm:description>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>4096 bytes</adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:size lower-limit="1b" upper-limit="2147483647b"></adm:size>
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-buffer-size</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="num-request-handlers" advanced="true">
    <adm:synopsis>
      Specifies the number of request handlers that are used to read
opends/src/server/org/opends/server/admin/AdministrationConnector.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.admin;
@@ -129,6 +129,8 @@
  private static final int ADMIN_MAX_REQUEST_SIZE = 5000000; // 5 Mb
  private static final int ADMIN_WRITE_BUFFER_SIZE = 4096;
  private static final int ADMIN_NUM_REQUEST_HANDLERS = 1;
  private static final boolean ADMIN_SEND_REJECTION_NOTICE = true;
@@ -419,6 +421,16 @@
    /**
     * {@inheritDoc}
     */
    public long getBufferSize()
    {
      return ADMIN_WRITE_BUFFER_SIZE;
    }
    /**
     * {@inheritDoc}
     */
    public int getNumRequestHandlers()
    {
      return ADMIN_NUM_REQUEST_HANDLERS;
opends/src/server/org/opends/server/controls/EntryChangelogNotificationControl.java
@@ -145,7 +145,6 @@
    writer.writeOctetString(cookie.toString());
    writer.writeEndSequence();
    writer.writeEndSequence();
    writer.flush();
  }
opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
@@ -32,6 +32,7 @@
import java.nio.channels.WritableByteChannel;
import java.io.OutputStream;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;
/**
 * This class is for writing ASN.1 elements directly to an
@@ -50,6 +51,9 @@
  // The NIO ByteStringBuilder to write to.
  private final ByteBuffer byteBuffer;
  // The lock to ensure atomic message flush.
  private final ReentrantLock flushLock;
  /**
   * An adaptor class provides a streaming interface to write to a
   * NIO ByteBuffer. This class is also responsible for writing
@@ -65,7 +69,7 @@
      if(!byteBuffer.hasRemaining())
      {
        // No more space left in the buffer, send out to the channel.
        ASN1ByteChannelWriter.this.flush();
        ASN1ByteChannelWriter.this.lockAndFlush();
      }
      byteBuffer.put((byte)i);
    }
@@ -95,7 +99,8 @@
        {
          byteBuffer.put(bytes, i + i1 - bytesToWrite, len);
          bytesToWrite -= len;
          ASN1ByteChannelWriter.this.flush();
          // No more space left in the buffer, send out to the channel.
          ASN1ByteChannelWriter.this.lockAndFlush();
        }
        else
        {
@@ -117,6 +122,7 @@
  {
    this.byteChannel = byteChannel;
    this.byteBuffer = ByteBuffer.allocate(writeBufferSize);
    this.flushLock = new ReentrantLock(false);
    ByteBufferOutputStream bufferStream = new ByteBufferOutputStream();
    this.writer = new ASN1OutputStreamWriter(bufferStream);
@@ -298,13 +304,52 @@
  /**
   * Flush the entire contents of the NIO ByteBuffer out to the
   * channel.
   * channel.  Note that the caller should only invoke this
   * method when it has finished writing the entire message and
   * not earlier. Otherwise, calling this method prematurely
   * while sharing this writer among several threads can cause
   * messages to interleave. While it is possible to ensure the
   * safety by either single threaded usage or using external
   * locking its generally not advised to do so and the caller
   * should instead obey no flush until the entire message is
   * written rule when using this writer.
   *
   * @throws IOException If an error occurs while flushing.
   */
  public void flush() throws IOException
  {
    byteBuffer.flip();
    try
    {
      if (!flushLock.isHeldByCurrentThread())
      {
        flushLock.lock();
      }
      while(byteBuffer.hasRemaining())
      {
        byteChannel.write(byteBuffer);
      }
    }
    finally
    {
      flushLock.unlock();
    }
    byteBuffer.clear();
  }
  /**
   * Take the flush lock and flush the entire contents of
   * the NIO ByteBuffer out to the channel.
   *
   * @throws IOException If an error occurs while flushing.
   */
  private void lockAndFlush() throws IOException
  {
    byteBuffer.flip();
    if (!flushLock.isHeldByCurrentThread())
    {
      flushLock.lock();
    }
    while(byteBuffer.hasRemaining())
    {
      byteChannel.write(byteBuffer);
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -181,10 +181,7 @@
  // The number of operations performed on this connection.
  // Used to compare with the resource limits of the network group.
  private long operationsPerformed;
  // Lock on the number of operations
  private final Object operationsPerformedLock;
  private final AtomicLong operationsPerformed;
  // The port on the client from which this connection originated.
  private final int clientPort;
@@ -216,10 +213,6 @@
  // in progress.
  private final Object opsInProgressLock;
  // The lock used to provide threadsafe access when sending data to the
  // client.
  private final Object transmitLock;
  // The socket channel with which this client connection is associated.
  private final SocketChannel clientChannel;
@@ -236,9 +229,7 @@
  private ASN1ByteChannelReader asn1Reader;
  private ASN1Writer asn1Writer;
  private static int APPLICATION_BUFFER_SIZE = 4096;
  private int APPLICATION_BUFFER_SIZE = 4096;
  private final RedirectingByteChannel saslChannel;
  private final RedirectingByteChannel tlsChannel;
@@ -246,6 +237,28 @@
  private volatile ConnectionSecurityProvider tlsPendingProvider = null;
  private volatile ConnectionSecurityProvider saslPendingProvider = null;
  /**
   * Thread local storage to provide each worker thread with its
   * own ASN1Writer to allow for parallel, non-blocking encoding.
   */
  private final ThreadLocal<ASN1Writer> threadLocalASN1Writer =
          new ThreadLocal<ASN1Writer>()
  {
    @Override
    protected ASN1Writer initialValue()
    {
      if (isSecure())
      {
        int appBufSize = activeProvider.getAppBufSize();
        return ASN1.getWriter(saslChannel, appBufSize);
      }
      else
      {
        return ASN1.getWriter(saslChannel,
          APPLICATION_BUFFER_SIZE);
      }
    }
  };
  /**
   * Creates a new LDAP client connection with the provided information.
@@ -270,7 +283,6 @@
    this.clientChannel = clientChannel;
    opsInProgressLock = new Object();
    transmitLock = new Object();
    ldapVersion = 3;
    requestHandler = null;
    lastCompletionTime = new AtomicLong(TimeThread.getTime());
@@ -278,8 +290,7 @@
    connectionValid = true;
    disconnectRequested = false;
    operationsInProgress = new ConcurrentHashMap<Integer, Operation>();
    operationsPerformed = 0;
    operationsPerformedLock = new Object();
    operationsPerformed = new AtomicLong(0);
    keepStats = connectionHandler.keepStats();
    this.protocol = protocol;
    writeSelector = new AtomicReference<Selector>();
@@ -299,6 +310,8 @@
      this.useNanoTime=DirectoryServer.getUseNanoTime();
    }
    APPLICATION_BUFFER_SIZE = connectionHandler.getBufferSize();
    tlsChannel =
        RedirectingByteChannel.getRedirectingByteChannel(clientChannel);
    saslChannel =
@@ -794,42 +807,24 @@
   */
  public void sendLDAPMessage(LDAPMessage message)
  {
    // Get the buffer used by this thread.
    // Get the writer used by this thread.
    ASN1Writer asn1Writer = threadLocalASN1Writer.get();
    try
    {
      // Make sure that we can only send one message at a time. This
      // locking will not have any impact on the ability to read
      // requests from the client.
      synchronized (transmitLock)
      message.write(asn1Writer);
      asn1Writer.flush();
      if (debugEnabled())
      {
        if (asn1Writer == null)
        {
          if (isSecure())
          {
            int appBufSize = activeProvider.getAppBufSize();
            asn1Writer = ASN1.getWriter(saslChannel, appBufSize);
          }
          else
          {
            asn1Writer =
                ASN1.getWriter(saslChannel, APPLICATION_BUFFER_SIZE);
          }
        }
        TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
          message.toString());
      }
        message.write(asn1Writer);
        asn1Writer.flush();
        if(debugEnabled())
        {
          TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
              message.toString());
        }
        if (keepStats)
        {
          // TODO hard-coded for now, flush probably needs to
          // return how many bytes were flushed.
          statTracker.updateMessageWritten(message, 4096);
        }
      if (keepStats)
      {
        // TODO hard-coded for now, flush probably needs to
        // return how many bytes were flushed.
        statTracker.updateMessageWritten(message, 4096);
      }
    }
    catch (Exception e)
@@ -1078,9 +1073,12 @@
              message);
        }
        // Add the operation to the list of operations in progress for
        // this connection.
        Operation op = operationsInProgress.putIfAbsent(messageID, operation);
        // See if there is already an operation in progress with the
        // same message ID. If so, then we can't allow it.
        Operation op = operationsInProgress.get(messageID);
        if (op != null)
        {
          Message message =
@@ -1089,10 +1087,6 @@
              message);
        }
        // Add the operation to the list of operations in progress for
        // this connection.
        operationsInProgress.put(messageID, operation);
        // Try to add the operation to the work queue,
        // or run it synchronously (typically for the administration
        // connector)
@@ -1399,12 +1393,19 @@
  @Override
  public long getNumberOfOperations()
  {
    long tmpNumberOfOperations;
    synchronized (operationsPerformedLock)
    {
      tmpNumberOfOperations = operationsPerformed;
    }
    return tmpNumberOfOperations;
    return operationsPerformed.get();
  }
  /**
   * Returns the ASN1 reader for this connection.
   *
   * @return the ASN1 reader for this connection
   */
  protected ASN1ByteChannelReader getASN1Reader()
  {
    return asn1Reader;
  }
@@ -1412,61 +1413,40 @@
  /**
   * Process data read.
   *
   * @return {@code true} if this connection is still valid.
   * @return number of bytes read if this connection is still valid
   *         or negative integer to indicate an error otherwise
   */
  public boolean processDataRead()
  public int processDataRead()
  {
    while (true)
    if (bindOrStartTLSInProgress.get())
    {
      if(bindOrStartTLSInProgress.get())
      {
        // We should wait for the bind or startTLS to finish before
        // reading any more data off the socket.
        return true;
      }
      // We should wait for the bind or startTLS to finish before
      // reading any more data off the socket.
      return 0;
    }
      try
    try
    {
      int result = asn1Reader.processChannelData();
      if (result < 0)
      {
        int result = asn1Reader.processChannelData();
        if (result < 0)
        {
          // The connection has been closed by the client. Disconnect
          // and return.
          disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
          return false;
        }
        if (result == 0)
        {
          // We have read all the data that there is to read right now
          // (or there wasn't any in the first place). Just return and
          // wait for future notification.
          return true;
        }
        else
        {
          // Decode all complete elements from the last read
          while (asn1Reader.elementAvailable())
          {
            if (!processLDAPMessage(LDAPReader.readMessage(asn1Reader)))
            {
              // Fatal connection error - client disconnected.
              return false;
            }
          }
        }
        // The connection has been closed by the client. Disconnect
        // and return.
        disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
        return -1;
      }
      catch (Exception e)
      return result;
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message m =
            ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String
                .valueOf(e));
        disconnect(DisconnectReason.PROTOCOL_ERROR, false, m);
        return false;
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      Message m =
        ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String.valueOf(e));
      disconnect(DisconnectReason.PROTOCOL_ERROR, true, m);
      return -1;
    }
  }
@@ -1485,17 +1465,14 @@
   *         error and the client has been disconnected as a result, or
   *         if the client unbound from the server.
   */
  private boolean processLDAPMessage(LDAPMessage message)
  public boolean processLDAPMessage(LDAPMessage message)
  {
    if (keepStats)
    {
      statTracker.updateMessageRead(message);
      this.getNetworkGroup().updateMessageRead(message);
    }
    synchronized (operationsPerformedLock)
    {
      operationsPerformed++;
    }
    operationsPerformed.getAndIncrement();
    List<Control> opControls = message.getControls();
opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
@@ -601,6 +601,19 @@
  /**
   * Retrieves the size in bytes of the LDAP response message
   * write buffer defined for this connection handler.
   *
   * @return The size in bytes of the LDAP response
   *         message write buffer.
   */
  public int getBufferSize() {
    return (int) currentConfig.getBufferSize();
  }
  /**
   * {@inheritDoc}
   */
  @Override
opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
@@ -40,15 +40,16 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.ServerShutdownListener;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.ErrorLogger;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1ByteChannelReader;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DisconnectReason;
import org.opends.server.types.InitializationException;
@@ -71,16 +72,6 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * The buffer size in bytes to use when reading data from a client.
   */
  public static final int BUFFER_SIZE = 8192;
  // Indicates whether the Directory Server is in the process of shutting down.
  private volatile boolean shutdownRequested = false;
@@ -89,12 +80,17 @@
  // The queue that will be used to hold the set of pending connections that
  // need to be registered with the selector.
  // TODO: revisit, see Issue 4202.
  private List<LDAPClientConnection> pendingConnections =
    new LinkedList<LDAPClientConnection>();
  // Lock object for synchronizing access to the pending connections queue.
  private final Object pendingConnectionsLock = new Object();
  // The list of connections ready for request processing.
  private LinkedList<LDAPClientConnection> readyConnections =
    new LinkedList<LDAPClientConnection>();
  // The selector that will be used to monitor the client connections.
  private final Selector selector;
@@ -180,6 +176,89 @@
    // loop, check for new requests, then check for new connections.
    while (!shutdownRequested)
    {
      LDAPClientConnection readyConnection = null;
      while ((readyConnection = readyConnections.poll()) != null)
      {
        try
        {
          ASN1ByteChannelReader asn1Reader = readyConnection.getASN1Reader();
          boolean ldapMessageProcessed = false;
          while (true)
          {
            if (asn1Reader.elementAvailable())
            {
              if (!ldapMessageProcessed)
              {
                if (readyConnection.processLDAPMessage(
                    LDAPReader.readMessage(asn1Reader)))
                {
                  ldapMessageProcessed = true;
                }
                else
                {
                  break;
                }
              }
              else
              {
                readyConnections.add(readyConnection);
                break;
              }
            }
            else
            {
              if (readyConnection.processDataRead() <= 0)
              {
                break;
              }
            }
          }
        }
        catch (Exception e)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
      }
      // Check to see if we have any pending connections that need to be
      // registered with the selector.
      List<LDAPClientConnection> tmp = null;
      synchronized (pendingConnectionsLock)
      {
        if (!pendingConnections.isEmpty())
        {
          tmp = pendingConnections;
          pendingConnections = new LinkedList<LDAPClientConnection>();
        }
      }
      if (tmp != null)
      {
        for (LDAPClientConnection c : tmp)
        {
          try
          {
            SocketChannel socketChannel = c.getSocketChannel();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ, c);
          }
          catch (Exception e)
          {
            if (debugEnabled())
            {
              TRACER.debugCaught(DebugLogLevel.ERROR, e);
            }
            c.disconnect(DisconnectReason.SERVER_ERROR, true,
                ERR_LDAP_REQHANDLER_CANNOT_REGISTER.get(handlerName,
                    String.valueOf(e)));
          }
        }
      }
      // Create a copy of the selection keys which can be used in a
      // thread-safe manner by getClientConnections. This copy is only
      // updated once per loop, so may not be accurate.
@@ -226,10 +305,14 @@
                try
                {
                  if (! clientConnection.processDataRead())
                  int readResult = clientConnection.processDataRead();
                  if (readResult < 0)
                  {
                    key.cancel();
                  }
                  if (readResult > 0) {
                    readyConnections.add(clientConnection);
                  }
                }
                catch (Exception e)
                {
@@ -307,43 +390,6 @@
          }
        }
      }
      // Check to see if we have any pending connections that need to be
      // registered with the selector.
      List<LDAPClientConnection> tmp = null;
      synchronized (pendingConnectionsLock)
      {
        if (!pendingConnections.isEmpty())
        {
          tmp = pendingConnections;
          pendingConnections = new LinkedList<LDAPClientConnection>();
        }
      }
      if (tmp != null)
      {
        for (LDAPClientConnection c : tmp)
        {
          try
          {
            SocketChannel socketChannel = c.getSocketChannel();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ, c);
          }
          catch (Exception e)
          {
            if (debugEnabled())
            {
              TRACER.debugCaught(DebugLogLevel.ERROR, e);
            }
            c.disconnect(DisconnectReason.SERVER_ERROR, true,
                ERR_LDAP_REQHANDLER_CANNOT_REGISTER.get(handlerName,
                    String.valueOf(e)));
          }
        }
      }
    }
    // Disconnect all active connections.
@@ -439,7 +485,6 @@
      return false;
    }
    // Try to add the new connection to the queue.  If it succeeds, then wake
    // up the selector so it will be picked up right away.  Otherwise,
    // disconnect the client.