From bdb54bc45ce05fcb01e106a8e9b52db188579efd Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Wed, 25 Nov 2009 10:05:23 +0000
Subject: [PATCH] Fix issue 4360 - OpenDS does not answer anymore when doing 24+ simultaneous subtree searches without reading the response.

---
 opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java |   57 +-------
 opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java |   13 -
 opends/src/server/org/opends/server/extensions/TLSByteChannel.java            |   39 ++----
 opends/src/server/org/opends/server/extensions/SASLByteChannel.java           |   11 -
 opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java  |  188 ++++++++++++++++++++++++++----
 5 files changed, 191 insertions(+), 117 deletions(-)

diff --git a/opends/src/server/org/opends/server/extensions/SASLByteChannel.java b/opends/src/server/org/opends/server/extensions/SASLByteChannel.java
index 7348209..6159f49 100644
--- a/opends/src/server/org/opends/server/extensions/SASLByteChannel.java
+++ b/opends/src/server/org/opends/server/extensions/SASLByteChannel.java
@@ -31,10 +31,8 @@
 import java.security.cert.Certificate;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
 import javax.security.sasl.Sasl;
 import org.opends.server.api.ClientConnection;
-import org.opends.server.util.StaticUtils;
 
 /**
  * This class implements a SASL byte channel that can be used during
@@ -310,14 +308,7 @@
      *         to the socket channel, or, {@code false} if not.
      */
     private int writeChannel(ByteBuffer buffer) throws IOException {
-        int bytesWritten = channel.write(buffer);
-        if (bytesWritten < 0)
-            throw new ClosedChannelException();
-        else if (bytesWritten == 0) {
-            if(!StaticUtils.writeWithTimeout(connection, buffer))
-                throw new ClosedChannelException();
-        }
-        return bytesWritten;
+        return channel.write(buffer);
       }
 
     /**
diff --git a/opends/src/server/org/opends/server/extensions/TLSByteChannel.java b/opends/src/server/org/opends/server/extensions/TLSByteChannel.java
index a363aab..ff85170 100644
--- a/opends/src/server/org/opends/server/extensions/TLSByteChannel.java
+++ b/opends/src/server/org/opends/server/extensions/TLSByteChannel.java
@@ -26,24 +26,24 @@
  */
 package org.opends.server.extensions;
 
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
 import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SocketChannel;
 import java.security.cert.Certificate;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+
 import javax.net.ssl.*;
 
 import org.opends.server.admin.std.server.LDAPConnectionHandlerCfg;
 import org.opends.server.api.ClientConnection;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.types.DebugLogLevel;
-import org.opends.server.util.StaticUtils;
-
-import static org.opends.server.loggers.debug.DebugLogger.*;
 
 /**
  * A class that provides a TLS byte channel implementation.
@@ -54,19 +54,19 @@
 
     private static final DebugTracer TRACER = getTracer();
 
-    private ClientConnection connection;
-    private SocketChannel socketChannel;
+    private final ClientConnection connection;
+    private final ByteChannel socketChannel;
 
-    private SSLEngine sslEngine;
+    private final SSLEngine sslEngine;
 
     //read copy to buffer
-    private ByteBuffer appData;
+    private final ByteBuffer appData;
     //read encrypted
-    private ByteBuffer appNetData;
+    private final ByteBuffer appNetData;
 
     //Write encrypted
-    private ByteBuffer netData, tempData;
-    private int sslBufferSize, appBufSize;
+    private final ByteBuffer netData, tempData;
+    private final int sslBufferSize, appBufSize;
     private boolean reading = false;
 
     //Map of cipher phrases to effective key size (bits). Taken from the
@@ -92,7 +92,7 @@
     };
 
     private TLSByteChannel(LDAPConnectionHandlerCfg config, ClientConnection c,
-                         SocketChannel socketChannel, SSLContext sslContext)  {
+        ByteChannel socketChannel, SSLContext sslContext)  {
 
         this.socketChannel = socketChannel;
         this.connection = c;
@@ -159,7 +159,7 @@
      */
     public static TLSByteChannel
     getTLSByteChannel(LDAPConnectionHandlerCfg config, ClientConnection c,
-                        SSLContext sslContext, SocketChannel socketChannel) {
+                        SSLContext sslContext, ByteChannel socketChannel) {
         return new TLSByteChannel(config, c, socketChannel, sslContext);
     }
 
@@ -375,18 +375,7 @@
                     hsStatus == SSLEngineResult.HandshakeStatus.NEED_WRAP ||
                     hsStatus == SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
                 doHandshakeWrite(hsStatus);
-            while (netData.hasRemaining()) {
-                int bytesWritten = socketChannel.write(netData);
-                if (bytesWritten < 0)
-                    throw new ClosedChannelException();
-                else if (bytesWritten == 0) {
-                    int bytesSent = netData.remaining();
-                    if(!StaticUtils.writeWithTimeout(connection, netData))
-                        throw new ClosedChannelException();
-                    totBytesSent += bytesSent;
-                } else
-                    totBytesSent += bytesWritten;
-            }
+            totBytesSent += socketChannel.write(netData);
         }
         return totBytesSent;
     }
diff --git a/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java b/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
index 4683e50..db382ed 100644
--- a/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
+++ b/opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
@@ -26,14 +26,14 @@
  */
 package org.opends.server.protocols.asn1;
 
-import org.opends.server.types.ByteSequence;
-
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
-import java.io.OutputStream;
-import java.io.IOException;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.opends.server.types.ByteSequence;
+
 /**
  * This class is for writing ASN.1 elements directly to an
  * NIO WritableByteChannel with an embedded ByteBuffer.
@@ -325,10 +325,7 @@
       {
         flushLock.lock();
       }
-      while(byteBuffer.hasRemaining())
-      {
-        byteChannel.write(byteBuffer);
-      }
+      byteChannel.write(byteBuffer);
     }
     finally
     {
diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
index f808e41..96cdcfb 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -28,17 +28,21 @@
 
 
 
+import static org.opends.messages.CoreMessages.ERR_ENQUEUE_BIND_IN_PROGRESS;
 import static org.opends.messages.ProtocolMessages.*;
-import static org.opends.server.loggers.AccessLogger.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.loggers.AccessLogger.logDisconnect;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.protocols.ldap.LDAPConstants.*;
-import static org.opends.server.types.OperationType.*;
-import static org.opends.server.util.StaticUtils.*;
+import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST;
+import static org.opends.server.util.StaticUtils.getExceptionMessage;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
+import java.io.IOException;
 import java.net.InetAddress;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
 import java.security.cert.Certificate;
 import java.util.Collection;
 import java.util.Iterator;
@@ -49,23 +53,9 @@
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
-import static org.opends.messages.CoreMessages.ERR_ENQUEUE_BIND_IN_PROGRESS;
 import org.opends.server.api.ClientConnection;
 import org.opends.server.api.ConnectionHandler;
-import org.opends.server.core.AbandonOperationBasis;
-import org.opends.server.core.AddOperationBasis;
-import org.opends.server.core.BindOperationBasis;
-import org.opends.server.core.CompareOperationBasis;
-import org.opends.server.core.DeleteOperationBasis;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.ExtendedOperationBasis;
-import org.opends.server.core.ModifyDNOperationBasis;
-import org.opends.server.core.ModifyOperationBasis;
-import org.opends.server.core.PersistentSearch;
-import org.opends.server.core.PluginConfigManager;
-import org.opends.server.core.SearchOperation;
-import org.opends.server.core.SearchOperationBasis;
-import org.opends.server.core.UnbindOperationBasis;
+import org.opends.server.core.*;
 import org.opends.server.core.networkgroups.NetworkGroup;
 import org.opends.server.extensions.ConnectionSecurityProvider;
 import org.opends.server.extensions.RedirectingByteChannel;
@@ -78,7 +68,6 @@
 import org.opends.server.protocols.asn1.ASN1Writer;
 import org.opends.server.types.*;
 import org.opends.server.util.TimeThread;
-import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST;
 
 
 /**
@@ -148,6 +137,138 @@
     }
   }
 
+  /**
+   * Channel that writes the contents of the provided buffer to the client,
+   * throwing an exception if the write is unsuccessful for too
+   * long (e.g., if the client is unresponsive or there is a network
+   * problem). If possible, it will attempt to use the selector returned
+   * by the {@code ClientConnection.getWriteSelector} method, but it is
+   * capable of working even if that method returns {@code null}. <BR>
+   *
+   * Note that the original position and limit values will not be
+   * preserved, so if that is important to the caller, then it should
+   * record them before calling this method and restore them after it
+   * returns.
+   */
+  private class TimeoutWriteByteChannel implements ByteChannel
+  {
+    public int read(ByteBuffer byteBuffer) throws IOException
+    {
+      return clientChannel.read(byteBuffer);
+    }
+
+    public boolean isOpen()
+    {
+      return clientChannel.isOpen();
+    }
+
+    public void close() throws IOException
+    {
+      clientChannel.close();
+    }
+
+    public int write(ByteBuffer byteBuffer) throws IOException
+    {
+      int bytesToWrite = byteBuffer.remaining();
+      clientChannel.write(byteBuffer);
+      if(!byteBuffer.hasRemaining())
+      {
+        return bytesToWrite;
+      }
+      long startTime = System.currentTimeMillis();
+      long waitTime = getMaxBlockedWriteTimeLimit();
+      if (waitTime <= 0)
+      {
+        // We won't support an infinite time limit, so fall back to using
+        // five minutes, which is a very long timeout given that we're
+        // blocking a worker thread.
+        waitTime = 300000L;
+      }
+
+      long stopTime = startTime + waitTime;
+
+      Selector selector = getWriteSelector();
+      if (selector == null)
+      {
+        // The client connection does not provide a selector, so we'll
+        // fall back
+        // to a more inefficient way that will work without a selector.
+        while (byteBuffer.hasRemaining()
+               && (System.currentTimeMillis() < stopTime))
+        {
+          if (clientChannel.write(byteBuffer) < 0)
+          {
+            // The client connection has been closed.
+            throw new ClosedChannelException();
+          }
+        }
+
+        if (byteBuffer.hasRemaining())
+        {
+          // If we've gotten here, then the write timed out.
+          throw new ClosedChannelException();
+        }
+
+        return bytesToWrite;
+      }
+
+      // Register with the selector for handling write operations.
+      SelectionKey key =
+          clientChannel.register(selector, SelectionKey.OP_WRITE);
+
+      try
+      {
+        selector.select(waitTime);
+        while (byteBuffer.hasRemaining())
+        {
+          long currentTime = System.currentTimeMillis();
+          if (currentTime >= stopTime)
+          {
+            // We've been blocked for too long.
+            throw new ClosedChannelException();
+          }
+          else
+          {
+            waitTime = stopTime - currentTime;
+          }
+
+          Iterator<SelectionKey> iterator =
+              selector.selectedKeys().iterator();
+          while (iterator.hasNext())
+          {
+            SelectionKey k = iterator.next();
+            if (k.isWritable())
+            {
+              int bytesWritten = clientChannel.write(byteBuffer);
+              if (bytesWritten < 0)
+              {
+                // The client connection has been closed.
+                throw new ClosedChannelException();
+              }
+
+              iterator.remove();
+            }
+          }
+
+          if (byteBuffer.hasRemaining())
+          {
+            selector.select(waitTime);
+          }
+        }
+
+        return bytesToWrite;
+      }
+      finally
+      {
+        if (key.isValid())
+        {
+          key.cancel();
+          selector.selectNow();
+        }
+      }
+    }
+  }
+
 
   /**
    * The tracer object for the debug logger.
@@ -216,6 +337,9 @@
   // The socket channel with which this client connection is associated.
   private final SocketChannel clientChannel;
 
+  // The byte channel used for blocking writes with time out.
+  private final ByteChannel timeoutClientChannel;
+
   // The string representation of the address of the client.
   private final String clientAddress;
 
@@ -269,12 +393,11 @@
    *          The socket channel that may be used to communicate with
    *          the client.
    * @param  protocol String representing the protocol (LDAP or LDAP+SSL).
+   * @throws DirectoryException If SSL initialisation fails.
    */
   public LDAPClientConnection(LDAPConnectionHandler connectionHandler,
-      SocketChannel clientChannel, String protocol)
+      SocketChannel clientChannel, String protocol) throws DirectoryException
   {
-    super();
-
     this.connectionHandler = connectionHandler;
     if (connectionHandler.isAdminConnectionHandler())
     {
@@ -282,6 +405,7 @@
     }
 
     this.clientChannel = clientChannel;
+    timeoutClientChannel = new TimeoutWriteByteChannel();
     opsInProgressLock = new Object();
     ldapVersion = 3;
     requestHandler = null;
@@ -313,13 +437,21 @@
     APPLICATION_BUFFER_SIZE = connectionHandler.getBufferSize();
 
     tlsChannel =
-        RedirectingByteChannel.getRedirectingByteChannel(clientChannel);
+        RedirectingByteChannel.getRedirectingByteChannel(
+            timeoutClientChannel);
     saslChannel =
         RedirectingByteChannel.getRedirectingByteChannel(tlsChannel);
     this.asn1Reader =
         ASN1.getReader(saslChannel, APPLICATION_BUFFER_SIZE, connectionHandler
             .getMaxRequestSize());
 
+
+    if (connectionHandler.useSSL())
+    {
+      enableSSL(connectionHandler.getTLSByteChannel(this,
+          timeoutClientChannel));
+    }
+
     connectionID = DirectoryServer.newConnectionAccepted(this);
     if (connectionID < 0)
     {
@@ -2349,7 +2481,7 @@
     try
     {
       TLSByteChannel tlsByteChannel =
-          connectionHandler.getTLSByteChannel(this, clientChannel);
+          connectionHandler.getTLSByteChannel(this, timeoutClientChannel);
       setTLSPendingProvider(tlsByteChannel);
     }
     catch (DirectoryException de)
diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
index 97dd795..862a01f 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
@@ -25,18 +25,10 @@
  *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.protocols.ldap;
-import org.opends.messages.Message;
-
-
-
-import static org.opends.server.loggers.AccessLogger.logConnect;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.monitors.ClientConnectionMonitorProvider;
-
 import static org.opends.messages.ProtocolMessages.*;
-
+import static org.opends.server.loggers.AccessLogger.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.util.ServerConstants.*;
 import static org.opends.server.util.StaticUtils.*;
 
@@ -44,35 +36,21 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
+import java.nio.channels.*;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
 
+import org.opends.messages.Message;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.std.server.ConnectionHandlerCfg;
 import org.opends.server.admin.std.server.LDAPConnectionHandlerCfg;
-import org.opends.server.api.AlertGenerator;
-import org.opends.server.api.ClientConnection;
-import org.opends.server.api.ConnectionHandler;
-import org.opends.server.api.DirectoryThread;
-import org.opends.server.api.KeyManagerProvider;
-import org.opends.server.api.ServerShutdownListener;
-import org.opends.server.api.TrustManagerProvider;
+import org.opends.server.api.*;
 import org.opends.server.api.plugin.PluginResult;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
@@ -82,18 +60,9 @@
 import org.opends.server.extensions.NullKeyManagerProvider;
 import org.opends.server.extensions.NullTrustManagerProvider;
 import org.opends.server.extensions.TLSByteChannel;
-import org.opends.server.types.AddressMask;
-import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.DN;
-import org.opends.server.types.DebugLogLevel;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.DisconnectReason;
-
-
-import org.opends.server.types.HostPort;
-import org.opends.server.types.InitializationException;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SSLClientAuthPolicy;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.monitors.ClientConnectionMonitorProvider;
+import org.opends.server.types.*;
 import org.opends.server.util.SelectableCertificateKeyManager;
 import org.opends.server.util.StaticUtils;
 
@@ -1293,10 +1262,6 @@
       }
       LDAPClientConnection c = new LDAPClientConnection(this, socketChannel,
                                                         getProtocol());
-      if(currentConfig.isUseSSL()) {
-          TLSByteChannel tlsByteChannel =  getTLSByteChannel(c, socketChannel);
-          c.enableSSL(tlsByteChannel);
-      }
       return c;
   }
 
@@ -1310,7 +1275,7 @@
    * @throws DirectoryException If the channel cannot be created.
    */
   public TLSByteChannel
-  getTLSByteChannel(LDAPClientConnection c, SocketChannel socketChannel)
+  getTLSByteChannel(LDAPClientConnection c, ByteChannel socketChannel)
                   throws DirectoryException {
          return(TLSByteChannel.getTLSByteChannel(currentConfig, c,
                                                     sslContext,

--
Gitblit v1.10.0