mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

abobrov
09.20.2009 2bbfdcf54303286798114aee270531f65c9c3a7c
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -181,10 +181,7 @@
  // The number of operations performed on this connection.
  // Used to compare with the resource limits of the network group.
  private long operationsPerformed;
  // Lock on the number of operations
  private final Object operationsPerformedLock;
  private final AtomicLong operationsPerformed;
  // The port on the client from which this connection originated.
  private final int clientPort;
@@ -216,10 +213,6 @@
  // in progress.
  private final Object opsInProgressLock;
  // The lock used to provide threadsafe access when sending data to the
  // client.
  private final Object transmitLock;
  // The socket channel with which this client connection is associated.
  private final SocketChannel clientChannel;
@@ -236,9 +229,7 @@
  private ASN1ByteChannelReader asn1Reader;
  private ASN1Writer asn1Writer;
  private static int APPLICATION_BUFFER_SIZE = 4096;
  private int APPLICATION_BUFFER_SIZE = 4096;
  private final RedirectingByteChannel saslChannel;
  private final RedirectingByteChannel tlsChannel;
@@ -246,6 +237,28 @@
  private volatile ConnectionSecurityProvider tlsPendingProvider = null;
  private volatile ConnectionSecurityProvider saslPendingProvider = null;
  /**
   * Thread local storage to provide each worker thread with its
   * own ASN1Writer to allow for parallel, non-blocking encoding.
   */
  private final ThreadLocal<ASN1Writer> threadLocalASN1Writer =
          new ThreadLocal<ASN1Writer>()
  {
    @Override
    protected ASN1Writer initialValue()
    {
      if (isSecure())
      {
        int appBufSize = activeProvider.getAppBufSize();
        return ASN1.getWriter(saslChannel, appBufSize);
      }
      else
      {
        return ASN1.getWriter(saslChannel,
          APPLICATION_BUFFER_SIZE);
      }
    }
  };
  /**
   * Creates a new LDAP client connection with the provided information.
@@ -270,7 +283,6 @@
    this.clientChannel = clientChannel;
    opsInProgressLock = new Object();
    transmitLock = new Object();
    ldapVersion = 3;
    requestHandler = null;
    lastCompletionTime = new AtomicLong(TimeThread.getTime());
@@ -278,8 +290,7 @@
    connectionValid = true;
    disconnectRequested = false;
    operationsInProgress = new ConcurrentHashMap<Integer, Operation>();
    operationsPerformed = 0;
    operationsPerformedLock = new Object();
    operationsPerformed = new AtomicLong(0);
    keepStats = connectionHandler.keepStats();
    this.protocol = protocol;
    writeSelector = new AtomicReference<Selector>();
@@ -299,6 +310,8 @@
      this.useNanoTime=DirectoryServer.getUseNanoTime();
    }
    APPLICATION_BUFFER_SIZE = connectionHandler.getBufferSize();
    tlsChannel =
        RedirectingByteChannel.getRedirectingByteChannel(clientChannel);
    saslChannel =
@@ -794,42 +807,24 @@
   */
  public void sendLDAPMessage(LDAPMessage message)
  {
    // Get the buffer used by this thread.
    // Get the writer used by this thread.
    ASN1Writer asn1Writer = threadLocalASN1Writer.get();
    try
    {
      // Make sure that we can only send one message at a time. This
      // locking will not have any impact on the ability to read
      // requests from the client.
      synchronized (transmitLock)
      message.write(asn1Writer);
      asn1Writer.flush();
      if (debugEnabled())
      {
        if (asn1Writer == null)
        {
          if (isSecure())
          {
            int appBufSize = activeProvider.getAppBufSize();
            asn1Writer = ASN1.getWriter(saslChannel, appBufSize);
          }
          else
          {
            asn1Writer =
                ASN1.getWriter(saslChannel, APPLICATION_BUFFER_SIZE);
          }
        }
        TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
          message.toString());
      }
        message.write(asn1Writer);
        asn1Writer.flush();
        if(debugEnabled())
        {
          TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
              message.toString());
        }
        if (keepStats)
        {
          // TODO hard-coded for now, flush probably needs to
          // return how many bytes were flushed.
          statTracker.updateMessageWritten(message, 4096);
        }
      if (keepStats)
      {
        // TODO hard-coded for now, flush probably needs to
        // return how many bytes were flushed.
        statTracker.updateMessageWritten(message, 4096);
      }
    }
    catch (Exception e)
@@ -1078,9 +1073,12 @@
              message);
        }
        // Add the operation to the list of operations in progress for
        // this connection.
        Operation op = operationsInProgress.putIfAbsent(messageID, operation);
        // See if there is already an operation in progress with the
        // same message ID. If so, then we can't allow it.
        Operation op = operationsInProgress.get(messageID);
        if (op != null)
        {
          Message message =
@@ -1089,10 +1087,6 @@
              message);
        }
        // Add the operation to the list of operations in progress for
        // this connection.
        operationsInProgress.put(messageID, operation);
        // Try to add the operation to the work queue,
        // or run it synchronously (typically for the administration
        // connector)
@@ -1399,12 +1393,19 @@
  @Override
  public long getNumberOfOperations()
  {
    long tmpNumberOfOperations;
    synchronized (operationsPerformedLock)
    {
      tmpNumberOfOperations = operationsPerformed;
    }
    return tmpNumberOfOperations;
    return operationsPerformed.get();
  }
  /**
   * Returns the ASN1 reader for this connection.
   *
   * @return the ASN1 reader for this connection
   */
  protected ASN1ByteChannelReader getASN1Reader()
  {
    return asn1Reader;
  }
@@ -1412,61 +1413,40 @@
  /**
   * Process data read.
   *
   * @return {@code true} if this connection is still valid.
   * @return number of bytes read if this connection is still valid
   *         or negative integer to indicate an error otherwise
   */
  public boolean processDataRead()
  public int processDataRead()
  {
    while (true)
    if (bindOrStartTLSInProgress.get())
    {
      if(bindOrStartTLSInProgress.get())
      {
        // We should wait for the bind or startTLS to finish before
        // reading any more data off the socket.
        return true;
      }
      // We should wait for the bind or startTLS to finish before
      // reading any more data off the socket.
      return 0;
    }
      try
    try
    {
      int result = asn1Reader.processChannelData();
      if (result < 0)
      {
        int result = asn1Reader.processChannelData();
        if (result < 0)
        {
          // The connection has been closed by the client. Disconnect
          // and return.
          disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
          return false;
        }
        if (result == 0)
        {
          // We have read all the data that there is to read right now
          // (or there wasn't any in the first place). Just return and
          // wait for future notification.
          return true;
        }
        else
        {
          // Decode all complete elements from the last read
          while (asn1Reader.elementAvailable())
          {
            if (!processLDAPMessage(LDAPReader.readMessage(asn1Reader)))
            {
              // Fatal connection error - client disconnected.
              return false;
            }
          }
        }
        // The connection has been closed by the client. Disconnect
        // and return.
        disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
        return -1;
      }
      catch (Exception e)
      return result;
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message m =
            ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String
                .valueOf(e));
        disconnect(DisconnectReason.PROTOCOL_ERROR, false, m);
        return false;
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      Message m =
        ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String.valueOf(e));
      disconnect(DisconnectReason.PROTOCOL_ERROR, true, m);
      return -1;
    }
  }
@@ -1485,17 +1465,14 @@
   *         error and the client has been disconnected as a result, or
   *         if the client unbound from the server.
   */
  private boolean processLDAPMessage(LDAPMessage message)
  public boolean processLDAPMessage(LDAPMessage message)
  {
    if (keepStats)
    {
      statTracker.updateMessageRead(message);
      this.getNetworkGroup().updateMessageRead(message);
    }
    synchronized (operationsPerformedLock)
    {
      operationsPerformed++;
    }
    operationsPerformed.getAndIncrement();
    List<Control> opControls = message.getControls();