opends/resource/config/config.ldif
@@ -428,6 +428,7 @@ ds-cfg-allow-tcp-reuse-address: true ds-cfg-send-rejection-notice: true ds-cfg-max-request-size: 5 megabytes ds-cfg-buffer-size: 4096 bytes ds-cfg-max-blocked-write-time-limit: 2 minutes ds-cfg-num-request-handlers: 2 ds-cfg-allow-start-tls: false @@ -452,6 +453,7 @@ ds-cfg-allow-tcp-reuse-address: true ds-cfg-send-rejection-notice: true ds-cfg-max-request-size: 5 megabytes ds-cfg-buffer-size: 4096 bytes ds-cfg-max-blocked-write-time-limit: 2 minutes ds-cfg-num-request-handlers: 2 ds-cfg-allow-start-tls: false opends/resource/schema/02-config.ldif
@@ -2601,7 +2601,8 @@ ds-cfg-trust-manager-provider $ ds-cfg-ssl-protocol $ ds-cfg-ssl-cipher-suite $ ds-cfg-max-blocked-write-time-limit ) ds-cfg-max-blocked-write-time-limit $ ds-cfg-buffer-size ) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.14 NAME 'ds-cfg-entry-cache' opends/src/admin/defn/org/opends/server/admin/std/LDAPConnectionHandlerConfiguration.xml
@@ -23,7 +23,7 @@ ! CDDL HEADER END ! ! ! Copyright 2007-2008 Sun Microsystems, Inc. ! Copyright 2007-2009 Sun Microsystems, Inc. ! --> <adm:managed-object name="ldap-connection-handler" plural-name="ldap-connection-handlers" @@ -438,6 +438,29 @@ </ldap:attribute> </adm:profile> </adm:property> <adm:property name="buffer-size" advanced="true"> <adm:synopsis> Specifies the size in bytes of the LDAP response message write buffer. </adm:synopsis> <adm:description> This property specifies write buffer size allocated by the server for each client connection and used to buffer LDAP response messages data when writing. </adm:description> <adm:default-behavior> <adm:defined> <adm:value>4096 bytes</adm:value> </adm:defined> </adm:default-behavior> <adm:syntax> <adm:size lower-limit="1b" upper-limit="2147483647b"></adm:size> </adm:syntax> <adm:profile name="ldap"> <ldap:attribute> <ldap:name>ds-cfg-buffer-size</ldap:name> </ldap:attribute> </adm:profile> </adm:property> <adm:property name="num-request-handlers" advanced="true"> <adm:synopsis> Specifies the number of request handlers that are used to read opends/src/server/org/opends/server/admin/AdministrationConnector.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.admin; @@ -129,6 +129,8 @@ private static final int ADMIN_MAX_REQUEST_SIZE = 5000000; // 5 Mb private static final int ADMIN_WRITE_BUFFER_SIZE = 4096; private static final int ADMIN_NUM_REQUEST_HANDLERS = 1; private static final boolean ADMIN_SEND_REJECTION_NOTICE = true; @@ -419,6 +421,16 @@ /** * {@inheritDoc} */ public long getBufferSize() { return ADMIN_WRITE_BUFFER_SIZE; } /** * {@inheritDoc} */ public int getNumRequestHandlers() { return ADMIN_NUM_REQUEST_HANDLERS; opends/src/server/org/opends/server/controls/EntryChangelogNotificationControl.java
@@ -145,7 +145,6 @@ writer.writeOctetString(cookie.toString()); writer.writeEndSequence(); writer.writeEndSequence(); writer.flush(); } opends/src/server/org/opends/server/protocols/asn1/ASN1ByteChannelWriter.java
@@ -32,6 +32,7 @@ import java.nio.channels.WritableByteChannel; import java.io.OutputStream; import java.io.IOException; import java.util.concurrent.locks.ReentrantLock; /** * This class is for writing ASN.1 elements directly to an @@ -50,6 +51,9 @@ // The NIO ByteStringBuilder to write to. private final ByteBuffer byteBuffer; // The lock to ensure atomic message flush. private final ReentrantLock flushLock; /** * An adaptor class provides a streaming interface to write to a * NIO ByteBuffer. This class is also responsible for writing @@ -65,7 +69,7 @@ if(!byteBuffer.hasRemaining()) { // No more space left in the buffer, send out to the channel. ASN1ByteChannelWriter.this.flush(); ASN1ByteChannelWriter.this.lockAndFlush(); } byteBuffer.put((byte)i); } @@ -95,7 +99,8 @@ { byteBuffer.put(bytes, i + i1 - bytesToWrite, len); bytesToWrite -= len; ASN1ByteChannelWriter.this.flush(); // No more space left in the buffer, send out to the channel. ASN1ByteChannelWriter.this.lockAndFlush(); } else { @@ -117,6 +122,7 @@ { this.byteChannel = byteChannel; this.byteBuffer = ByteBuffer.allocate(writeBufferSize); this.flushLock = new ReentrantLock(false); ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(); this.writer = new ASN1OutputStreamWriter(bufferStream); @@ -298,13 +304,52 @@ /** * Flush the entire contents of the NIO ByteBuffer out to the * channel. * channel. Note that the caller should only invoke this * method when it has finished writing the entire message and * not earlier. Otherwise, calling this method prematurely * while sharing this writer among several threads can cause * messages to interleave. While it is possible to ensure the * safety by either single threaded usage or using external * locking its generally not advised to do so and the caller * should instead obey no flush until the entire message is * written rule when using this writer. * * @throws IOException If an error occurs while flushing. */ public void flush() throws IOException { byteBuffer.flip(); try { if (!flushLock.isHeldByCurrentThread()) { flushLock.lock(); } while(byteBuffer.hasRemaining()) { byteChannel.write(byteBuffer); } } finally { flushLock.unlock(); } byteBuffer.clear(); } /** * Take the flush lock and flush the entire contents of * the NIO ByteBuffer out to the channel. * * @throws IOException If an error occurs while flushing. */ private void lockAndFlush() throws IOException { byteBuffer.flip(); if (!flushLock.isHeldByCurrentThread()) { flushLock.lock(); } while(byteBuffer.hasRemaining()) { byteChannel.write(byteBuffer); opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -181,10 +181,7 @@ // The number of operations performed on this connection. // Used to compare with the resource limits of the network group. private long operationsPerformed; // Lock on the number of operations private final Object operationsPerformedLock; private final AtomicLong operationsPerformed; // The port on the client from which this connection originated. private final int clientPort; @@ -216,10 +213,6 @@ // in progress. private final Object opsInProgressLock; // The lock used to provide threadsafe access when sending data to the // client. private final Object transmitLock; // The socket channel with which this client connection is associated. private final SocketChannel clientChannel; @@ -236,9 +229,7 @@ private ASN1ByteChannelReader asn1Reader; private ASN1Writer asn1Writer; private static int APPLICATION_BUFFER_SIZE = 4096; private int APPLICATION_BUFFER_SIZE = 4096; private final RedirectingByteChannel saslChannel; private final RedirectingByteChannel tlsChannel; @@ -246,6 +237,28 @@ private volatile ConnectionSecurityProvider tlsPendingProvider = null; private volatile ConnectionSecurityProvider saslPendingProvider = null; /** * Thread local storage to provide each worker thread with its * own ASN1Writer to allow for parallel, non-blocking encoding. */ private final ThreadLocal<ASN1Writer> threadLocalASN1Writer = new ThreadLocal<ASN1Writer>() { @Override protected ASN1Writer initialValue() { if (isSecure()) { int appBufSize = activeProvider.getAppBufSize(); return ASN1.getWriter(saslChannel, appBufSize); } else { return ASN1.getWriter(saslChannel, APPLICATION_BUFFER_SIZE); } } }; /** * Creates a new LDAP client connection with the provided information. @@ -270,7 +283,6 @@ this.clientChannel = clientChannel; opsInProgressLock = new Object(); transmitLock = new Object(); ldapVersion = 3; requestHandler = null; lastCompletionTime = new AtomicLong(TimeThread.getTime()); @@ -278,8 +290,7 @@ connectionValid = true; disconnectRequested = false; operationsInProgress = new ConcurrentHashMap<Integer, Operation>(); operationsPerformed = 0; operationsPerformedLock = new Object(); operationsPerformed = new AtomicLong(0); keepStats = connectionHandler.keepStats(); this.protocol = protocol; writeSelector = new AtomicReference<Selector>(); @@ -299,6 +310,8 @@ this.useNanoTime=DirectoryServer.getUseNanoTime(); } APPLICATION_BUFFER_SIZE = connectionHandler.getBufferSize(); tlsChannel = RedirectingByteChannel.getRedirectingByteChannel(clientChannel); saslChannel = @@ -794,42 +807,24 @@ */ public void sendLDAPMessage(LDAPMessage message) { // Get the buffer used by this thread. // Get the writer used by this thread. ASN1Writer asn1Writer = threadLocalASN1Writer.get(); try { // Make sure that we can only send one message at a time. This // locking will not have any impact on the ability to read // requests from the client. synchronized (transmitLock) message.write(asn1Writer); asn1Writer.flush(); if (debugEnabled()) { if (asn1Writer == null) { if (isSecure()) { int appBufSize = activeProvider.getAppBufSize(); asn1Writer = ASN1.getWriter(saslChannel, appBufSize); } else { asn1Writer = ASN1.getWriter(saslChannel, APPLICATION_BUFFER_SIZE); } } TRACER.debugProtocolElement(DebugLogLevel.VERBOSE, message.toString()); } message.write(asn1Writer); asn1Writer.flush(); if(debugEnabled()) { TRACER.debugProtocolElement(DebugLogLevel.VERBOSE, message.toString()); } if (keepStats) { // TODO hard-coded for now, flush probably needs to // return how many bytes were flushed. statTracker.updateMessageWritten(message, 4096); } if (keepStats) { // TODO hard-coded for now, flush probably needs to // return how many bytes were flushed. statTracker.updateMessageWritten(message, 4096); } } catch (Exception e) @@ -1078,9 +1073,12 @@ message); } // Add the operation to the list of operations in progress for // this connection. Operation op = operationsInProgress.putIfAbsent(messageID, operation); // See if there is already an operation in progress with the // same message ID. If so, then we can't allow it. Operation op = operationsInProgress.get(messageID); if (op != null) { Message message = @@ -1089,10 +1087,6 @@ message); } // Add the operation to the list of operations in progress for // this connection. operationsInProgress.put(messageID, operation); // Try to add the operation to the work queue, // or run it synchronously (typically for the administration // connector) @@ -1399,12 +1393,19 @@ @Override public long getNumberOfOperations() { long tmpNumberOfOperations; synchronized (operationsPerformedLock) { tmpNumberOfOperations = operationsPerformed; } return tmpNumberOfOperations; return operationsPerformed.get(); } /** * Returns the ASN1 reader for this connection. * * @return the ASN1 reader for this connection */ protected ASN1ByteChannelReader getASN1Reader() { return asn1Reader; } @@ -1412,61 +1413,40 @@ /** * Process data read. * * @return {@code true} if this connection is still valid. * @return number of bytes read if this connection is still valid * or negative integer to indicate an error otherwise */ public boolean processDataRead() public int processDataRead() { while (true) if (bindOrStartTLSInProgress.get()) { if(bindOrStartTLSInProgress.get()) { // We should wait for the bind or startTLS to finish before // reading any more data off the socket. return true; } // We should wait for the bind or startTLS to finish before // reading any more data off the socket. return 0; } try try { int result = asn1Reader.processChannelData(); if (result < 0) { int result = asn1Reader.processChannelData(); if (result < 0) { // The connection has been closed by the client. Disconnect // and return. disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null); return false; } if (result == 0) { // We have read all the data that there is to read right now // (or there wasn't any in the first place). Just return and // wait for future notification. return true; } else { // Decode all complete elements from the last read while (asn1Reader.elementAvailable()) { if (!processLDAPMessage(LDAPReader.readMessage(asn1Reader))) { // Fatal connection error - client disconnected. return false; } } } // The connection has been closed by the client. Disconnect // and return. disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null); return -1; } catch (Exception e) return result; } catch (Exception e) { if (debugEnabled()) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } Message m = ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String .valueOf(e)); disconnect(DisconnectReason.PROTOCOL_ERROR, false, m); return false; TRACER.debugCaught(DebugLogLevel.ERROR, e); } Message m = ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String.valueOf(e)); disconnect(DisconnectReason.PROTOCOL_ERROR, true, m); return -1; } } @@ -1485,17 +1465,14 @@ * error and the client has been disconnected as a result, or * if the client unbound from the server. */ private boolean processLDAPMessage(LDAPMessage message) public boolean processLDAPMessage(LDAPMessage message) { if (keepStats) { statTracker.updateMessageRead(message); this.getNetworkGroup().updateMessageRead(message); } synchronized (operationsPerformedLock) { operationsPerformed++; } operationsPerformed.getAndIncrement(); List<Control> opControls = message.getControls(); opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
@@ -601,6 +601,19 @@ /** * Retrieves the size in bytes of the LDAP response message * write buffer defined for this connection handler. * * @return The size in bytes of the LDAP response * message write buffer. */ public int getBufferSize() { return (int) currentConfig.getBufferSize(); } /** * {@inheritDoc} */ @Override opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
@@ -40,15 +40,16 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import org.opends.messages.Message; import org.opends.server.api.DirectoryThread; import org.opends.server.api.ServerShutdownListener; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.ErrorLogger; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.asn1.ASN1ByteChannelReader; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DisconnectReason; import org.opends.server.types.InitializationException; @@ -71,16 +72,6 @@ */ private static final DebugTracer TRACER = getTracer(); /** * The buffer size in bytes to use when reading data from a client. */ public static final int BUFFER_SIZE = 8192; // Indicates whether the Directory Server is in the process of shutting down. private volatile boolean shutdownRequested = false; @@ -89,12 +80,17 @@ // The queue that will be used to hold the set of pending connections that // need to be registered with the selector. // TODO: revisit, see Issue 4202. private List<LDAPClientConnection> pendingConnections = new LinkedList<LDAPClientConnection>(); // Lock object for synchronizing access to the pending connections queue. private final Object pendingConnectionsLock = new Object(); // The list of connections ready for request processing. private LinkedList<LDAPClientConnection> readyConnections = new LinkedList<LDAPClientConnection>(); // The selector that will be used to monitor the client connections. private final Selector selector; @@ -180,6 +176,89 @@ // loop, check for new requests, then check for new connections. while (!shutdownRequested) { LDAPClientConnection readyConnection = null; while ((readyConnection = readyConnections.poll()) != null) { try { ASN1ByteChannelReader asn1Reader = readyConnection.getASN1Reader(); boolean ldapMessageProcessed = false; while (true) { if (asn1Reader.elementAvailable()) { if (!ldapMessageProcessed) { if (readyConnection.processLDAPMessage( LDAPReader.readMessage(asn1Reader))) { ldapMessageProcessed = true; } else { break; } } else { readyConnections.add(readyConnection); break; } } else { if (readyConnection.processDataRead() <= 0) { break; } } } } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } } // Check to see if we have any pending connections that need to be // registered with the selector. List<LDAPClientConnection> tmp = null; synchronized (pendingConnectionsLock) { if (!pendingConnections.isEmpty()) { tmp = pendingConnections; pendingConnections = new LinkedList<LDAPClientConnection>(); } } if (tmp != null) { for (LDAPClientConnection c : tmp) { try { SocketChannel socketChannel = c.getSocketChannel(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ, c); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } c.disconnect(DisconnectReason.SERVER_ERROR, true, ERR_LDAP_REQHANDLER_CANNOT_REGISTER.get(handlerName, String.valueOf(e))); } } } // Create a copy of the selection keys which can be used in a // thread-safe manner by getClientConnections. This copy is only // updated once per loop, so may not be accurate. @@ -226,10 +305,14 @@ try { if (! clientConnection.processDataRead()) int readResult = clientConnection.processDataRead(); if (readResult < 0) { key.cancel(); } if (readResult > 0) { readyConnections.add(clientConnection); } } catch (Exception e) { @@ -307,43 +390,6 @@ } } } // Check to see if we have any pending connections that need to be // registered with the selector. List<LDAPClientConnection> tmp = null; synchronized (pendingConnectionsLock) { if (!pendingConnections.isEmpty()) { tmp = pendingConnections; pendingConnections = new LinkedList<LDAPClientConnection>(); } } if (tmp != null) { for (LDAPClientConnection c : tmp) { try { SocketChannel socketChannel = c.getSocketChannel(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ, c); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } c.disconnect(DisconnectReason.SERVER_ERROR, true, ERR_LDAP_REQHANDLER_CANNOT_REGISTER.get(handlerName, String.valueOf(e))); } } } } // Disconnect all active connections. @@ -439,7 +485,6 @@ return false; } // Try to add the new connection to the queue. If it succeeds, then wake // up the selector so it will be picked up right away. Otherwise, // disconnect the client.