mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

neil_a_wilson
12.03.2007 47be44124da7f6ad42bed03a24701ca07c00918d
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -39,7 +39,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -190,10 +189,10 @@
  // The lock used to provide threadsafe access to the set of operations in
  // progress.
  private ReentrantLock opsInProgressLock;
  private Object opsInProgressLock;
  // The lock used to provide threadsafe access when sending data to the client.
  private ReentrantLock transmitLock;
  private Object transmitLock;
  // The socket channel with which this client connection is associated.
  private SocketChannel clientChannel;
@@ -234,8 +233,8 @@
    this.securityProvider      = null;
    this.clearSecurityProvider = null;
    opsInProgressLock = new ReentrantLock();
    transmitLock      = new ReentrantLock();
    opsInProgressLock = new Object();
    transmitLock      = new Object();
    elementReadState         = ELEMENT_READ_STATE_NEED_TYPE;
    elementType              = 0x00;
@@ -874,60 +873,57 @@
    // Make sure that we can only send one message at a time.  This locking will
    // not have any impact on the ability to read requests from the client.
    transmitLock.lock();
    try
    synchronized (transmitLock)
    {
      try
      {
        int bytesWritten = messageBuffer.limit() - messageBuffer.position();
        if (! secProvider.writeData(messageBuffer))
        try
        {
          int bytesWritten = messageBuffer.limit() - messageBuffer.position();
          if (! secProvider.writeData(messageBuffer))
          {
            return;
          }
          TRACER.debugProtocolElement(DebugLogLevel.VERBOSE, message);
          TRACER.debugProtocolElement(DebugLogLevel.VERBOSE, messageElement);
          messageBuffer.rewind();
          if (debugEnabled())
          {
            TRACER.debugData(DebugLogLevel.VERBOSE, messageBuffer);
          }
          if (keepStats)
          {
            statTracker.updateMessageWritten(message, bytesWritten);
          }
        }
        catch (@Deprecated Exception e)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
          // We were unable to send the message due to some other internal
          // problem.  Disconnect from the client and return.
          disconnect(DisconnectReason.SERVER_ERROR, true, null);
          return;
        }
        TRACER.debugProtocolElement(DebugLogLevel.VERBOSE, message);
        TRACER.debugProtocolElement(DebugLogLevel.VERBOSE, messageElement);
        messageBuffer.rewind();
        if (debugEnabled())
        {
          TRACER.debugData(DebugLogLevel.VERBOSE, messageBuffer);
        }
        if (keepStats)
        {
          statTracker.updateMessageWritten(message, bytesWritten);
        }
      }
      catch (@Deprecated Exception e)
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        // We were unable to send the message due to some other internal
        // problem.  Disconnect from the client and return.
        // FIXME -- Log a message or something
        disconnect(DisconnectReason.SERVER_ERROR, true, null);
        return;
      }
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      // FIXME -- Log a message or something
      disconnect(DisconnectReason.SERVER_ERROR, true, null);
      return;
    }
    finally
    {
      transmitLock.unlock();
    }
  }
@@ -974,22 +970,19 @@
    // Set a flag indicating that the connection is being terminated so that no
    // new requests will be accepted.  Also cancel all operations in progress.
    opsInProgressLock.lock();
    try
    synchronized (opsInProgressLock)
    {
      disconnectRequested = true;
    }
    catch (Exception e)
    {
      if (debugEnabled())
      try
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        disconnectRequested = true;
      }
    }
    finally
    {
      opsInProgressLock.unlock();
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
    }
    cancelAllOperations(new CancelRequest(true, message));
@@ -1174,64 +1167,63 @@
    // We need to grab a lock to ensure that no one else can add operations to
    // the queue while we are performing some preliminary checks.
    opsInProgressLock.lock();
    try
    synchronized (opsInProgressLock)
    {
      // If we're already in the process of disconnecting the client, then
      // reject the operation.
      if (disconnectRequested)
      try
      {
        Message message = WARN_LDAP_CLIENT_DISCONNECT_IN_PROGRESS.get();
        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
        // If we're already in the process of disconnecting the client, then
        // reject the operation.
        if (disconnectRequested)
        {
          Message message = WARN_LDAP_CLIENT_DISCONNECT_IN_PROGRESS.get();
          throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                                       message);
        }
        // See if there is already an operation in progress with the same
        // message ID.  If so, then we can't allow it.
        AbstractOperation op = operationsInProgress.get(messageID);
        if (op != null)
        {
          Message message =
               WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
          throw new DirectoryException(ResultCode.PROTOCOL_ERROR, message);
        }
        // Add the operation to the list of operations in progress for this
        // connection.
        operationsInProgress.put(messageID, operation);
        // Try to add the operation to the work queue.
        DirectoryServer.enqueueRequest(operation);
      }
      // See if there is already an operation in progress with the same message
      // ID.  If so, then we can't allow it.
      AbstractOperation op = operationsInProgress.get(messageID);
      if (op != null)
      catch (DirectoryException de)
      {
        Message message = WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
        throw new DirectoryException(ResultCode.PROTOCOL_ERROR, message);
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, de);
        }
        operationsInProgress.remove(messageID);
        lastCompletionTime.set(TimeThread.getTime());
        throw de;
      }
      // Add the operation to the list of operations in progress for this
      // connection.
      operationsInProgress.put(messageID, operation);
      // Try to add the operation to the work queue.
      DirectoryServer.enqueueRequest(operation);
    }
    catch (DirectoryException de)
    {
      if (debugEnabled())
      catch (Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, de);
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message message =
            WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
        throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
                                     message, e);
      }
      operationsInProgress.remove(messageID);
      lastCompletionTime.set(TimeThread.getTime());
      throw de;
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      Message message =
          WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
      throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
                                   message, e);
    }
    finally
    {
      opsInProgressLock.unlock();
    }
  }
@@ -1322,84 +1314,11 @@
  public void cancelAllOperations(CancelRequest cancelRequest)
  {
    // Make sure that no one can add any new operations.
    opsInProgressLock.lock();
    try
    synchronized (opsInProgressLock)
    {
      for (AbstractOperation o : operationsInProgress.values())
      try
      {
        try
        {
          CancelResult cancelResult = o.cancel(cancelRequest);
          if (keepStats && (cancelResult == CancelResult.CANCELED))
          {
            statTracker.updateAbandonedOperation();
          }
        }
        catch (Exception e)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
      }
      if (! (operationsInProgress.isEmpty() &&
             getPersistentSearches().isEmpty()))
      {
        lastCompletionTime.set(TimeThread.getTime());
      }
      operationsInProgress.clear();
      for (PersistentSearch persistentSearch : getPersistentSearches())
      {
        DirectoryServer.deregisterPersistentSearch(persistentSearch);
      }
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
    finally
    {
      opsInProgressLock.unlock();
    }
  }
  /**
   * Attempts to cancel all operations in progress on this connection except the
   * operation with the specified message ID.
   *
   * @param  cancelRequest  An object providing additional information about how
   *                        the cancel should be processed.
   * @param  messageID      The message ID of the operation that should not be
   *                        canceled.
   */
  public void cancelAllOperationsExcept(CancelRequest cancelRequest,
                                        int messageID)
  {
    // Make sure that no one can add any new operations.
    opsInProgressLock.lock();
    try
    {
      for (int msgID : operationsInProgress.keySet())
      {
        if (msgID == messageID)
        {
          continue;
        }
        AbstractOperation o = operationsInProgress.get(msgID);
        if (o != null)
        for (AbstractOperation o : operationsInProgress.values())
        {
          try
          {
@@ -1418,27 +1337,94 @@
          }
        }
        operationsInProgress.remove(msgID);
        lastCompletionTime.set(TimeThread.getTime());
      }
        if (! (operationsInProgress.isEmpty() &&
               getPersistentSearches().isEmpty()))
        {
          lastCompletionTime.set(TimeThread.getTime());
        }
        operationsInProgress.clear();
      for (PersistentSearch persistentSearch : getPersistentSearches())
        for (PersistentSearch persistentSearch : getPersistentSearches())
        {
          DirectoryServer.deregisterPersistentSearch(persistentSearch);
        }
      }
      catch (Exception e)
      {
        DirectoryServer.deregisterPersistentSearch(persistentSearch);
        lastCompletionTime.set(TimeThread.getTime());
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
    }
    catch (Exception e)
  }
  /**
   * Attempts to cancel all operations in progress on this connection except the
   * operation with the specified message ID.
   *
   * @param  cancelRequest  An object providing additional information about how
   *                        the cancel should be processed.
   * @param  messageID      The message ID of the operation that should not be
   *                        canceled.
   */
  public void cancelAllOperationsExcept(CancelRequest cancelRequest,
                                        int messageID)
  {
    // Make sure that no one can add any new operations.
    synchronized (opsInProgressLock)
    {
      if (debugEnabled())
      try
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        for (int msgID : operationsInProgress.keySet())
        {
          if (msgID == messageID)
          {
            continue;
          }
          AbstractOperation o = operationsInProgress.get(msgID);
          if (o != null)
          {
            try
            {
              CancelResult cancelResult = o.cancel(cancelRequest);
              if (keepStats && (cancelResult == CancelResult.CANCELED))
              {
                statTracker.updateAbandonedOperation();
              }
            }
            catch (Exception e)
            {
              if (debugEnabled())
              {
                TRACER.debugCaught(DebugLogLevel.ERROR, e);
              }
            }
          }
          operationsInProgress.remove(msgID);
          lastCompletionTime.set(TimeThread.getTime());
        }
        for (PersistentSearch persistentSearch : getPersistentSearches())
        {
          DirectoryServer.deregisterPersistentSearch(persistentSearch);
          lastCompletionTime.set(TimeThread.getTime());
        }
      }
    }
    finally
    {
      opsInProgressLock.unlock();
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
    }
  }