From 2bbfdcf54303286798114aee270531f65c9c3a7c Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Fri, 09 Oct 2009 01:20:46 +0000
Subject: [PATCH] - CMT improvements to parallel request and response processing.

---
 opends/resource/schema/02-config.ldif                                                    |    3 
 opends/src/server/org/opends/server/admin/AdministrationConnector.java                   |   14 +
 opends/src/admin/defn/org/opends/server/admin/std/LDAPConnectionHandlerConfiguration.xml |   25 +++
 opends/src/server/org/opends/server/controls/EntryChangelogNotificationControl.java      |    1 
 opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java            |   13 +
 opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java               |  145 +++++++++++------
 opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java            |   51 ++++++
 opends/resource/config/config.ldif                                                       |    2 
 opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java             |  195 ++++++++++-------------
 9 files changed, 283 insertions(+), 166 deletions(-)

diff --git a/opends/resource/config/config.ldif b/opends/resource/config/config.ldif
index 0d2490a..babea19 100644
--- a/opends/resource/config/config.ldif
+++ b/opends/resource/config/config.ldif
@@ -428,6 +428,7 @@
 ds-cfg-allow-tcp-reuse-address: true
 ds-cfg-send-rejection-notice: true
 ds-cfg-max-request-size: 5 megabytes
+ds-cfg-buffer-size: 4096 bytes
 ds-cfg-max-blocked-write-time-limit: 2 minutes
 ds-cfg-num-request-handlers: 2
 ds-cfg-allow-start-tls: false
@@ -452,6 +453,7 @@
 ds-cfg-allow-tcp-reuse-address: true
 ds-cfg-send-rejection-notice: true
 ds-cfg-max-request-size: 5 megabytes
+ds-cfg-buffer-size: 4096 bytes
 ds-cfg-max-blocked-write-time-limit: 2 minutes
 ds-cfg-num-request-handlers: 2
 ds-cfg-allow-start-tls: false
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index f1f179d..12f31ef 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -2601,7 +2601,8 @@
         ds-cfg-trust-manager-provider $
         ds-cfg-ssl-protocol $
         ds-cfg-ssl-cipher-suite $
-        ds-cfg-max-blocked-write-time-limit )
+        ds-cfg-max-blocked-write-time-limit $
+        ds-cfg-buffer-size )
   X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.14
   NAME 'ds-cfg-entry-cache'
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/LDAPConnectionHandlerConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/LDAPConnectionHandlerConfiguration.xml
index 287471e..c4e1ac1 100644
--- a/opends/src/admin/defn/org/opends/server/admin/std/LDAPConnectionHandlerConfiguration.xml
+++ b/opends/src/admin/defn/org/opends/server/admin/std/LDAPConnectionHandlerConfiguration.xml
@@ -23,7 +23,7 @@
   ! CDDL HEADER END
   !
   !
-  !      Copyright 2007-2008 Sun Microsystems, Inc.
+  !      Copyright 2007-2009 Sun Microsystems, Inc.
   ! -->
 <adm:managed-object name="ldap-connection-handler"
   plural-name="ldap-connection-handlers"
@@ -438,6 +438,29 @@
       </ldap:attribute>
     </adm:profile>
   </adm:property>
+  <adm:property name="buffer-size" advanced="true">
+    <adm:synopsis>
+      Specifies the size in bytes of the LDAP response message write buffer.
+    </adm:synopsis>
+    <adm:description>
+      This property specifies write buffer size allocated by the server for
+      each client connection and used to buffer LDAP response messages data
+      when writing.
+    </adm:description>
+    <adm:default-behavior>
+      <adm:defined>
+        <adm:value>4096 bytes</adm:value>
+      </adm:defined>
+    </adm:default-behavior>
+    <adm:syntax>
+      <adm:size lower-limit="1b" upper-limit="2147483647b"></adm:size>
+    </adm:syntax>
+    <adm:profile name="ldap">
+      <ldap:attribute>
+        <ldap:name>ds-cfg-buffer-size</ldap:name>
+      </ldap:attribute>
+    </adm:profile>
+  </adm:property>
   <adm:property name="num-request-handlers" advanced="true">
     <adm:synopsis>
       Specifies the number of request handlers that are used to read
diff --git a/opends/src/server/org/opends/server/admin/AdministrationConnector.java b/opends/src/server/org/opends/server/admin/AdministrationConnector.java
index f819898..58c5bc2 100644
--- a/opends/src/server/org/opends/server/admin/AdministrationConnector.java
+++ b/opends/src/server/org/opends/server/admin/AdministrationConnector.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.admin;
 
@@ -129,6 +129,8 @@
 
   private static final int ADMIN_MAX_REQUEST_SIZE = 5000000; // 5 Mb
 
+  private static final int ADMIN_WRITE_BUFFER_SIZE = 4096;
+
   private static final int ADMIN_NUM_REQUEST_HANDLERS = 1;
 
   private static final boolean ADMIN_SEND_REJECTION_NOTICE = true;
@@ -419,6 +421,16 @@
     /**
      * {@inheritDoc}
      */
+    public long getBufferSize()
+    {
+      return ADMIN_WRITE_BUFFER_SIZE;
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
     public int getNumRequestHandlers()
     {
       return ADMIN_NUM_REQUEST_HANDLERS;
diff --git a/opends/src/server/org/opends/server/controls/EntryChangelogNotificationControl.java b/opends/src/server/org/opends/server/controls/EntryChangelogNotificationControl.java
index 7001fc2..4044fd4 100644
--- a/opends/src/server/org/opends/server/controls/EntryChangelogNotificationControl.java
+++ b/opends/src/server/org/opends/server/controls/EntryChangelogNotificationControl.java
@@ -145,7 +145,6 @@
     writer.writeOctetString(cookie.toString());
     writer.writeEndSequence();
     writer.writeEndSequence();
-    writer.flush();
   }
 
 
diff --git a/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java b/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
index 98c6aff..4683e50 100644
--- a/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
+++ b/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
@@ -32,6 +32,7 @@
 import java.nio.channels.WritableByteChannel;
 import java.io.OutputStream;
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This class is for writing ASN.1 elements directly to an
@@ -50,6 +51,9 @@
   // The NIO ByteStringBuilder to write to.
   private final ByteBuffer byteBuffer;
 
+  // The lock to ensure atomic message flush.
+  private final ReentrantLock flushLock;
+
   /**
    * An adaptor class provides a streaming interface to write to a
    * NIO ByteBuffer. This class is also responsible for writing
@@ -65,7 +69,7 @@
       if(!byteBuffer.hasRemaining())
       {
         // No more space left in the buffer, send out to the channel.
-        ASN1ByteChannelWriter.this.flush();
+        ASN1ByteChannelWriter.this.lockAndFlush();
       }
       byteBuffer.put((byte)i);
     }
@@ -95,7 +99,8 @@
         {
           byteBuffer.put(bytes, i + i1 - bytesToWrite, len);
           bytesToWrite -= len;
-          ASN1ByteChannelWriter.this.flush();
+          // No more space left in the buffer, send out to the channel.
+          ASN1ByteChannelWriter.this.lockAndFlush();
         }
         else
         {
@@ -117,6 +122,7 @@
   {
     this.byteChannel = byteChannel;
     this.byteBuffer = ByteBuffer.allocate(writeBufferSize);
+    this.flushLock = new ReentrantLock(false);
 
     ByteBufferOutputStream bufferStream = new ByteBufferOutputStream();
     this.writer = new ASN1OutputStreamWriter(bufferStream);
@@ -298,13 +304,52 @@
 
   /**
    * Flush the entire contents of the NIO ByteBuffer out to the
-   * channel.
+   * channel.  Note that the caller should only invoke this
+   * method when it has finished writing the entire message and
+   * not earlier. Otherwise, calling this method prematurely
+   * while sharing this writer among several threads can cause
+   * messages to interleave. While it is possible to ensure the
+   * safety by either single threaded usage or using external
+   * locking its generally not advised to do so and the caller
+   * should instead obey no flush until the entire message is
+   * written rule when using this writer.
    *
    * @throws IOException If an error occurs while flushing.
    */
   public void flush() throws IOException
   {
     byteBuffer.flip();
+    try
+    {
+      if (!flushLock.isHeldByCurrentThread())
+      {
+        flushLock.lock();
+      }
+      while(byteBuffer.hasRemaining())
+      {
+        byteChannel.write(byteBuffer);
+      }
+    }
+    finally
+    {
+      flushLock.unlock();
+    }
+    byteBuffer.clear();
+  }
+
+  /**
+   * Take the flush lock and flush the entire contents of
+   * the NIO ByteBuffer out to the channel.
+   *
+   * @throws IOException If an error occurs while flushing.
+   */
+  private void lockAndFlush() throws IOException
+  {
+    byteBuffer.flip();
+    if (!flushLock.isHeldByCurrentThread())
+    {
+      flushLock.lock();
+    }
     while(byteBuffer.hasRemaining())
     {
       byteChannel.write(byteBuffer);
diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
index b14755b..f808e41 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -181,10 +181,7 @@
 
   // The number of operations performed on this connection.
   // Used to compare with the resource limits of the network group.
-  private long operationsPerformed;
-
-  // Lock on the number of operations
-  private final Object operationsPerformedLock;
+  private final AtomicLong operationsPerformed;
 
   // The port on the client from which this connection originated.
   private final int clientPort;
@@ -216,10 +213,6 @@
   // in progress.
   private final Object opsInProgressLock;
 
-  // The lock used to provide threadsafe access when sending data to the
-  // client.
-  private final Object transmitLock;
-
   // The socket channel with which this client connection is associated.
   private final SocketChannel clientChannel;
 
@@ -236,9 +229,7 @@
 
   private ASN1ByteChannelReader asn1Reader;
 
-  private ASN1Writer asn1Writer;
-
-  private static int APPLICATION_BUFFER_SIZE = 4096;
+  private int APPLICATION_BUFFER_SIZE = 4096;
 
   private final RedirectingByteChannel saslChannel;
   private final RedirectingByteChannel tlsChannel;
@@ -246,6 +237,28 @@
   private volatile ConnectionSecurityProvider tlsPendingProvider = null;
   private volatile ConnectionSecurityProvider saslPendingProvider = null;
 
+  /**
+   * Thread local storage to provide each worker thread with its
+   * own ASN1Writer to allow for parallel, non-blocking encoding.
+   */
+  private final ThreadLocal<ASN1Writer> threadLocalASN1Writer =
+          new ThreadLocal<ASN1Writer>()
+  {
+    @Override
+    protected ASN1Writer initialValue()
+    {
+      if (isSecure())
+      {
+        int appBufSize = activeProvider.getAppBufSize();
+        return ASN1.getWriter(saslChannel, appBufSize);
+      }
+      else
+      {
+        return ASN1.getWriter(saslChannel,
+          APPLICATION_BUFFER_SIZE);
+      }
+    }
+  };
 
   /**
    * Creates a new LDAP client connection with the provided information.
@@ -270,7 +283,6 @@
 
     this.clientChannel = clientChannel;
     opsInProgressLock = new Object();
-    transmitLock = new Object();
     ldapVersion = 3;
     requestHandler = null;
     lastCompletionTime = new AtomicLong(TimeThread.getTime());
@@ -278,8 +290,7 @@
     connectionValid = true;
     disconnectRequested = false;
     operationsInProgress = new ConcurrentHashMap<Integer, Operation>();
-    operationsPerformed = 0;
-    operationsPerformedLock = new Object();
+    operationsPerformed = new AtomicLong(0);
     keepStats = connectionHandler.keepStats();
     this.protocol = protocol;
     writeSelector = new AtomicReference<Selector>();
@@ -299,6 +310,8 @@
       this.useNanoTime=DirectoryServer.getUseNanoTime();
     }
 
+    APPLICATION_BUFFER_SIZE = connectionHandler.getBufferSize();
+
     tlsChannel =
         RedirectingByteChannel.getRedirectingByteChannel(clientChannel);
     saslChannel =
@@ -794,42 +807,24 @@
    */
   public void sendLDAPMessage(LDAPMessage message)
   {
-    // Get the buffer used by this thread.
+    // Get the writer used by this thread.
+    ASN1Writer asn1Writer = threadLocalASN1Writer.get();
     try
     {
-      // Make sure that we can only send one message at a time. This
-      // locking will not have any impact on the ability to read
-      // requests from the client.
-      synchronized (transmitLock)
+      message.write(asn1Writer);
+      asn1Writer.flush();
+
+      if (debugEnabled())
       {
-        if (asn1Writer == null)
-        {
-          if (isSecure())
-          {
-            int appBufSize = activeProvider.getAppBufSize();
-            asn1Writer = ASN1.getWriter(saslChannel, appBufSize);
-          }
-          else
-          {
-            asn1Writer =
-                ASN1.getWriter(saslChannel, APPLICATION_BUFFER_SIZE);
-          }
-        }
+        TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
+          message.toString());
+      }
 
-        message.write(asn1Writer);
-        asn1Writer.flush();
-        if(debugEnabled())
-        {
-          TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
-              message.toString());
-        }
-
-        if (keepStats)
-        {
-          // TODO hard-coded for now, flush probably needs to
-          // return how many bytes were flushed.
-          statTracker.updateMessageWritten(message, 4096);
-        }
+      if (keepStats)
+      {
+        // TODO hard-coded for now, flush probably needs to
+        // return how many bytes were flushed.
+        statTracker.updateMessageWritten(message, 4096);
       }
     }
     catch (Exception e)
@@ -1078,9 +1073,12 @@
               message);
         }
 
+        // Add the operation to the list of operations in progress for
+        // this connection.
+        Operation op = operationsInProgress.putIfAbsent(messageID, operation);
+
         // See if there is already an operation in progress with the
         // same message ID. If so, then we can't allow it.
-        Operation op = operationsInProgress.get(messageID);
         if (op != null)
         {
           Message message =
@@ -1089,10 +1087,6 @@
               message);
         }
 
-        // Add the operation to the list of operations in progress for
-        // this connection.
-        operationsInProgress.put(messageID, operation);
-
         // Try to add the operation to the work queue,
         // or run it synchronously (typically for the administration
         // connector)
@@ -1399,12 +1393,19 @@
   @Override
   public long getNumberOfOperations()
   {
-    long tmpNumberOfOperations;
-    synchronized (operationsPerformedLock)
-    {
-      tmpNumberOfOperations = operationsPerformed;
-    }
-    return tmpNumberOfOperations;
+    return operationsPerformed.get();
+  }
+
+
+
+  /**
+   * Returns the ASN1 reader for this connection.
+   *
+   * @return the ASN1 reader for this connection
+   */
+  protected ASN1ByteChannelReader getASN1Reader()
+  {
+    return asn1Reader;
   }
 
 
@@ -1412,61 +1413,40 @@
   /**
    * Process data read.
    *
-   * @return {@code true} if this connection is still valid.
+   * @return number of bytes read if this connection is still valid
+   *         or negative integer to indicate an error otherwise
    */
-  public boolean processDataRead()
+  public int processDataRead()
   {
-    while (true)
+    if (bindOrStartTLSInProgress.get())
     {
-      if(bindOrStartTLSInProgress.get())
-      {
-        // We should wait for the bind or startTLS to finish before
-        // reading any more data off the socket.
-        return true;
-      }
+      // We should wait for the bind or startTLS to finish before
+      // reading any more data off the socket.
+      return 0;
+    }
 
-      try
+    try
+    {
+      int result = asn1Reader.processChannelData();
+      if (result < 0)
       {
-        int result = asn1Reader.processChannelData();
-        if (result < 0)
-        {
-          // The connection has been closed by the client. Disconnect
-          // and return.
-          disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
-          return false;
-        }
-        if (result == 0)
-        {
-          // We have read all the data that there is to read right now
-          // (or there wasn't any in the first place). Just return and
-          // wait for future notification.
-          return true;
-        }
-        else
-        {
-          // Decode all complete elements from the last read
-          while (asn1Reader.elementAvailable())
-          {
-            if (!processLDAPMessage(LDAPReader.readMessage(asn1Reader)))
-            {
-              // Fatal connection error - client disconnected.
-              return false;
-            }
-          }
-        }
+        // The connection has been closed by the client. Disconnect
+        // and return.
+        disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
+        return -1;
       }
-      catch (Exception e)
+      return result;
+    }
+    catch (Exception e)
+    {
+      if (debugEnabled())
       {
-        if (debugEnabled())
-        {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        }
-        Message m =
-            ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String
-                .valueOf(e));
-        disconnect(DisconnectReason.PROTOCOL_ERROR, false, m);
-        return false;
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
       }
+      Message m =
+        ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String.valueOf(e));
+      disconnect(DisconnectReason.PROTOCOL_ERROR, true, m);
+      return -1;
     }
   }
 
@@ -1485,17 +1465,14 @@
    *         error and the client has been disconnected as a result, or
    *         if the client unbound from the server.
    */
-  private boolean processLDAPMessage(LDAPMessage message)
+  public boolean processLDAPMessage(LDAPMessage message)
   {
     if (keepStats)
     {
       statTracker.updateMessageRead(message);
       this.getNetworkGroup().updateMessageRead(message);
     }
-    synchronized (operationsPerformedLock)
-    {
-      operationsPerformed++;
-    }
+    operationsPerformed.getAndIncrement();
 
     List<Control> opControls = message.getControls();
 
diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
index 47fbb80..97dd795 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
@@ -601,6 +601,19 @@
 
 
   /**
+   * Retrieves the size in bytes of the LDAP response message
+   * write buffer defined for this connection handler.
+   *
+   * @return The size in bytes of the LDAP response
+   *         message write buffer.
+   */
+  public int getBufferSize() {
+    return (int) currentConfig.getBufferSize();
+  }
+
+
+
+  /**
    * {@inheritDoc}
    */
   @Override
diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
index 0301ec4..528679e 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
@@ -40,15 +40,16 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+
 import java.util.LinkedList;
 import java.util.List;
-
 import org.opends.messages.Message;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.api.ServerShutdownListener;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.ErrorLogger;
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.protocols.asn1.ASN1ByteChannelReader;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DisconnectReason;
 import org.opends.server.types.InitializationException;
@@ -71,16 +72,6 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-
-
-
-  /**
-   * The buffer size in bytes to use when reading data from a client.
-   */
-  public static final int BUFFER_SIZE = 8192;
-
-
-
   // Indicates whether the Directory Server is in the process of shutting down.
   private volatile boolean shutdownRequested = false;
 
@@ -89,12 +80,17 @@
 
   // The queue that will be used to hold the set of pending connections that
   // need to be registered with the selector.
+  // TODO: revisit, see Issue 4202.
   private List<LDAPClientConnection> pendingConnections =
     new LinkedList<LDAPClientConnection>();
 
   // Lock object for synchronizing access to the pending connections queue.
   private final Object pendingConnectionsLock = new Object();
 
+  // The list of connections ready for request processing.
+  private LinkedList<LDAPClientConnection> readyConnections =
+    new LinkedList<LDAPClientConnection>();
+
   // The selector that will be used to monitor the client connections.
   private final Selector selector;
 
@@ -180,6 +176,89 @@
     // loop, check for new requests, then check for new connections.
     while (!shutdownRequested)
     {
+      LDAPClientConnection readyConnection = null;
+      while ((readyConnection = readyConnections.poll()) != null)
+      {
+        try
+        {
+          ASN1ByteChannelReader asn1Reader = readyConnection.getASN1Reader();
+          boolean ldapMessageProcessed = false;
+          while (true)
+          {
+            if (asn1Reader.elementAvailable())
+            {
+              if (!ldapMessageProcessed)
+              {
+                if (readyConnection.processLDAPMessage(
+                    LDAPReader.readMessage(asn1Reader)))
+                {
+                  ldapMessageProcessed = true;
+                }
+                else
+                {
+                  break;
+                }
+              }
+              else
+              {
+                readyConnections.add(readyConnection);
+                break;
+              }
+            }
+            else
+            {
+              if (readyConnection.processDataRead() <= 0)
+              {
+                break;
+              }
+            }
+          }
+        }
+        catch (Exception e)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
+        }
+      }
+
+      // Check to see if we have any pending connections that need to be
+      // registered with the selector.
+      List<LDAPClientConnection> tmp = null;
+      synchronized (pendingConnectionsLock)
+      {
+        if (!pendingConnections.isEmpty())
+        {
+          tmp = pendingConnections;
+          pendingConnections = new LinkedList<LDAPClientConnection>();
+        }
+      }
+
+      if (tmp != null)
+      {
+        for (LDAPClientConnection c : tmp)
+        {
+          try
+          {
+            SocketChannel socketChannel = c.getSocketChannel();
+            socketChannel.configureBlocking(false);
+            socketChannel.register(selector, SelectionKey.OP_READ, c);
+          }
+          catch (Exception e)
+          {
+            if (debugEnabled())
+            {
+              TRACER.debugCaught(DebugLogLevel.ERROR, e);
+            }
+
+            c.disconnect(DisconnectReason.SERVER_ERROR, true,
+                ERR_LDAP_REQHANDLER_CANNOT_REGISTER.get(handlerName,
+                    String.valueOf(e)));
+          }
+        }
+      }
+
       // Create a copy of the selection keys which can be used in a
       // thread-safe manner by getClientConnections. This copy is only
       // updated once per loop, so may not be accurate.
@@ -226,10 +305,14 @@
 
                 try
                 {
-                  if (! clientConnection.processDataRead())
+                  int readResult = clientConnection.processDataRead();
+                  if (readResult < 0)
                   {
                     key.cancel();
                   }
+                  if (readResult > 0) {
+                    readyConnections.add(clientConnection);
+                  }
                 }
                 catch (Exception e)
                 {
@@ -307,43 +390,6 @@
           }
         }
       }
-
-
-      // Check to see if we have any pending connections that need to be
-      // registered with the selector.
-      List<LDAPClientConnection> tmp = null;
-      synchronized (pendingConnectionsLock)
-      {
-        if (!pendingConnections.isEmpty())
-        {
-          tmp = pendingConnections;
-          pendingConnections = new LinkedList<LDAPClientConnection>();
-        }
-      }
-
-      if (tmp != null)
-      {
-        for (LDAPClientConnection c : tmp)
-        {
-          try
-          {
-            SocketChannel socketChannel = c.getSocketChannel();
-            socketChannel.configureBlocking(false);
-            socketChannel.register(selector, SelectionKey.OP_READ, c);
-          }
-          catch (Exception e)
-          {
-            if (debugEnabled())
-            {
-              TRACER.debugCaught(DebugLogLevel.ERROR, e);
-            }
-
-            c.disconnect(DisconnectReason.SERVER_ERROR, true,
-                ERR_LDAP_REQHANDLER_CANNOT_REGISTER.get(handlerName,
-                    String.valueOf(e)));
-          }
-        }
-      }
     }
 
     // Disconnect all active connections.
@@ -439,7 +485,6 @@
       return false;
     }
 
-
     // Try to add the new connection to the queue.  If it succeeds, then wake
     // up the selector so it will be picked up right away.  Otherwise,
     // disconnect the client.

--
Gitblit v1.10.0