OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
* Upgrade grizzly to 2.3.28
* Use HeapBufferManager for client SDK (ThreadLocal based)
* Use PooledMemoryManager for server listener (allows to pool buffer
acrros threads)
* Fix Buffer management in LdapCodec / LDAPClientFilter / ASN1Buffer*
* Fix race-condition on bind in progress detection
| | |
| | | * The maximum BER element size, or <code>0</code> to indicate |
| | | * that there is no limit. |
| | | * @param buffer |
| | | * The buffer where the content will be read from. |
| | | * The buffer where the content will be read from. Note that |
| | | * @{code {@link #close()} this reader will also dispose the buffer. |
| | | */ |
| | | ASN1BufferReader(final int maxElementSize, final Buffer buffer) { |
| | | this.readLimiter = new RootSequenceLimiter(); |
| | | this.buffer = buffer; |
| | | this.buffer.allowBufferDispose(false); |
| | | this.maxElementSize = maxElementSize; |
| | | } |
| | | |
| | | /** |
| | | * Closes this ASN.1 reader and the underlying stream. |
| | | * Closes this ASN.1 reader and the underlying {@link Buffer}. |
| | | * |
| | | * @throws IOException |
| | | * if an I/O error occurs |
| | | */ |
| | | @Override |
| | | public void close() throws IOException { |
| | | // Nothing to do |
| | | buffer.tryDispose(); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | void ensureAdditionalCapacity(final int size) { |
| | | final int newCount = outBuffer.position() + size; |
| | | if (newCount > outBuffer.capacity()) { |
| | | outBuffer = memoryManager.reallocate(outBuffer, Math.max(outBuffer.capacity() << 1, newCount)); |
| | | if (newCount > outBuffer.limit()) { |
| | | outBuffer = memoryManager.reallocate(outBuffer, Math.max(outBuffer.limit() << 1, newCount)); |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.glassfish.grizzly.memory.HeapMemoryManager; |
| | | import org.glassfish.grizzly.nio.transport.TCPNIOTransport; |
| | | import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder; |
| | | import org.glassfish.grizzly.strategies.SameThreadIOStrategy; |
| | |
| | | // Enabled by default. |
| | | builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr)); |
| | | } |
| | | // Default heap manager has changed in grizzly 2.3.27 to the more memory consuming PooledMemoryManager |
| | | builder.setMemoryManager(new HeapMemoryManager()); |
| | | |
| | | final TCPNIOTransport transport = builder.build(); |
| | | |
| | |
| | | buffer.mark(); |
| | | if (!reader.elementAvailable()) { |
| | | buffer.reset(); |
| | | return ctx.getStopAction(buffer); |
| | | // We need to create a duplicate because buffer will be closed by the reader (try-with-resources) |
| | | return ctx.getStopAction(buffer.duplicate()); |
| | | } |
| | | final int length = reader.peekLength(); |
| | | final Buffer remainder = buffer.remaining() > length ? buffer.split(buffer.position() + length) : null; |
| | | buffer.reset(); |
| | | try (final ASN1BufferReader packetReader = new ASN1BufferReader(maxASN1ElementSize, buffer)) { |
| | | try (final ASN1BufferReader packetReader = |
| | | new ASN1BufferReader(maxASN1ElementSize, buffer.asReadOnlyBuffer())) { |
| | | final LDAPReader<? extends ASN1Reader> ldapReader = LDAP.getReader(packetReader, decodeOptions); |
| | | ctx.setMessage(null); |
| | | ldapReader.readMessage(handler); |
| | | } finally { |
| | | buffer.tryDispose(); |
| | | } |
| | | return ctx.getInvokeAction(remainder); |
| | | } catch (IOException e) { |
| | |
| | | |
| | | @Override |
| | | public void request(long n) { |
| | | if (BackpressureHelper.add(pendingRequests, n) == 0 && ctx != null) { |
| | | ctx.resumeNext(); |
| | | final FilterChainContext immutableRef = ctx; |
| | | if (BackpressureHelper.add(pendingRequests, n) == 0 && immutableRef != null) { |
| | | immutableRef.resumeNext(); |
| | | ctx = null; |
| | | } |
| | | } |
| | |
| | | try { |
| | | final Buffer buffer = ctx.getMessage(); |
| | | try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) { |
| | | buffer.mark(); |
| | | // Due to a bug in grizzly's ByteBufferWrapper.split(), we can't use byteBuffer.mark() |
| | | final int mark = buffer.position(); |
| | | if (!reader.elementAvailable()) { |
| | | buffer.reset(); |
| | | return ctx.getStopAction(buffer); |
| | | buffer.position(mark); |
| | | // We need to create a duplicate because buffer will be closed by the reader (try-with-resources) |
| | | return ctx.getStopAction(buffer.duplicate()); |
| | | } |
| | | final int length = reader.peekLength(); |
| | | if (length > maxASN1ElementSize) { |
| | | buffer.reset(); |
| | | buffer.position(mark); |
| | | throw DecodeException.fatalError( |
| | | ERR_LDAP_CLIENT_DECODE_MAX_REQUEST_SIZE_EXCEEDED.get(length, maxASN1ElementSize)); |
| | | } |
| | | final Buffer remainder = (buffer.remaining() > length) |
| | | ? buffer.split(buffer.position() + length) |
| | | : null; |
| | | buffer.reset(); |
| | | ctx.setMessage(decodePacket(new ASN1BufferReader(maxASN1ElementSize, buffer))); |
| | | buffer.position(mark); |
| | | ctx.setMessage(decodePacket(new ASN1BufferReader(maxASN1ElementSize, buffer.asReadOnlyBuffer()))); |
| | | buffer.tryDispose(); |
| | | return ctx.getInvokeAction(remainder); |
| | | } |
| | |
| | | import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage; |
| | | import org.glassfish.grizzly.CompletionHandler; |
| | | import org.glassfish.grizzly.Connection; |
| | | import org.glassfish.grizzly.WriteHandler; |
| | | import org.reactivestreams.Subscriber; |
| | | import org.reactivestreams.Subscription; |
| | | |
| | |
| | | return; |
| | | } |
| | | upstream = s; |
| | | connection.notifyCanWrite(new WriteHandler() { |
| | | @Override |
| | | public void onWritePossible() throws Exception { |
| | | final Subscription sub = upstream; |
| | | if (sub != null) { |
| | | sub.request(1); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onError(final Throwable error) { |
| | | LdapResponseMessageWriter.this.onError(error); |
| | | } |
| | | }); |
| | | // We're requesting two response to allow overlap between async I/O and response computation. |
| | | // (allows to generate a response while we're waiting for the previous message to be written) |
| | | upstream.request(2); |
| | | } |
| | | |
| | | @Override |
| | | public void onNext(final LdapResponseMessage message) { |
| | | connection.write(message).addCompletionHandler(this); |
| | | connection.write(message, this); |
| | | } |
| | | |
| | | @Override |
| | |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.glassfish.grizzly.memory.PooledMemoryManager; |
| | | import org.glassfish.grizzly.nio.transport.TCPNIOTransport; |
| | | import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder; |
| | | import org.glassfish.grizzly.strategies.SameThreadIOStrategy; |
| | |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | static final ServerTCPNIOTransport SERVER_TRANSPORT = new ServerTCPNIOTransport(); |
| | | private static final long MB = 1024 * 1024; |
| | | |
| | | private ServerTCPNIOTransport() { |
| | | // Prevent instantiation. |
| | |
| | | // Enabled by default. |
| | | builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr)); |
| | | } |
| | | float heapPercent; |
| | | if (Runtime.getRuntime().maxMemory() < 1024 * MB) { |
| | | // Low heap |
| | | heapPercent = 0.01f; |
| | | } else { |
| | | // Compute a percentage to try to reach roughly 64Mb (big enough (tm)) |
| | | heapPercent = 64f * MB / Runtime.getRuntime().maxMemory(); |
| | | } |
| | | // Force usage of PooledMemoryManager which allows to use grizzly's buffers across threads. |
| | | builder.setMemoryManager(new PooledMemoryManager( |
| | | 1024, // Initial buffer size |
| | | 3, // Number of pools (with growing factor below this give us pools of 1K, 4K, 16k buffers) |
| | | 4, // Growing factor to apply on the size of the buffer polled by the next pool |
| | | selectorThreadCount, // Number of pool slices that every pool will stripe allocation requests across |
| | | heapPercent, // The percentage of the heap that this manager will use when populating the pools (5%) |
| | | 1f, // The percentage of buffers to be pre-allocated during MemoryManager initialization (100%) |
| | | true // true to use direct buffers or false to use heap buffers |
| | | )); |
| | | |
| | | final TCPNIOTransport transport = builder.build(); |
| | | |
| | |
| | | |
| | | <dependency> |
| | | <groupId>org.forgerock.commons</groupId> |
| | | <artifactId>forgerock-audit-handler-splunk</artifactId> |
| | | </dependency> |
| | | |
| | | <dependency> |
| | | <groupId>org.forgerock.commons</groupId> |
| | | <artifactId>forgerock-audit-handler-jms</artifactId> |
| | | </dependency> |
| | | |
| | | <dependency> |
| | | <groupId>org.forgerock.commons</groupId> |
| | | <artifactId>forgerock-audit-json</artifactId> |
| | | </dependency> |
| | | |
| | |
| | | <Export-Package>org.forgerock.opendj.server.embedded</Export-Package> |
| | | <!-- Import je changelog since it is not shipped in the main jar --> |
| | | <Import-Package> |
| | | org.opends.server.replication.server.changelog.je, |
| | | org.opends.server.replication.server.changelog.je;resolution:=optional, |
| | | com.sleepycat.je*;resolution:=optional, |
| | | ${opendj.osgi.import} |
| | | </Import-Package> |
| | | <Embed-Dependency> |
| | | forgerock-persistit-core, |
| | | <!--je,--> |
| | | jcip-annotations |
| | | </Embed-Dependency> |
| | | </instructions> |
| | | </configuration> |
| | | </execution> |
| | |
| | | |
| | | import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher; |
| | | import static org.forgerock.opendj.io.LDAP.*; |
| | | import static org.forgerock.util.Utils.closeSilently; |
| | | import static org.opends.messages.CoreMessages.*; |
| | | import static org.opends.messages.ProtocolMessages.*; |
| | | import static org.opends.server.loggers.AccessLogger.logDisconnect; |
| | |
| | | Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | @Override |
| | | public void subscribe(FlowableEmitter<Response> emitter) throws Exception { |
| | | try { |
| | | processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter); |
| | | } finally { |
| | | // We don't need the ASN1Reader anymore. |
| | | closeSilently(message.getContent()); |
| | | } |
| | | } |
| | | }, BackpressureStrategy.ERROR))) |
| | | .onNext(new Consumer<Response>() { |
| | |
| | | return (int) currentConfig.getMaxRequestSize(); |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the size in bytes of the LDAP response message write buffer defined for this connection handler. |
| | | * |
| | | * @return The size in bytes of the LDAP response message write buffer. |
| | | */ |
| | | public int getBufferSize() { |
| | | return (int) currentConfig.getBufferSize(); |
| | | } |
| | | |
| | | @Override |
| | | public String getProtocol() { |
| | | return protocol; |
| | |
| | | setProcessingStopTime(); |
| | | logBindResponse(this); |
| | | |
| | | // Send the bind response to the client. |
| | | clientConnection.sendResponse(this); |
| | | |
| | | // If the bind processing is finished, then unset the "bind in progress" |
| | | // flag to allow other operations to be processed on the connection. |
| | | if (getResultCode() != ResultCode.SASL_BIND_IN_PROGRESS) |
| | |
| | | } |
| | | clientConnection.finishBind(); |
| | | |
| | | clientConnection.sendResponse(this); |
| | | |
| | | invokePostResponsePlugins(workflowExecuted); |
| | | } |
| | | } |
| | |
| | | <forgerock-build-tools.version>1.0.2</forgerock-build-tools.version> |
| | | <forgerock-doc-plugin.version>3.2.2-SNAPSHOT</forgerock-doc-plugin.version> |
| | | <freemarker.version>2.3.24-incubating</freemarker.version> |
| | | <grizzly-framework.version>2.3.24</grizzly-framework.version> |
| | | <grizzly-framework.version>2.3.28</grizzly-framework.version> |
| | | <metrics-core.version>3.1.2</metrics-core.version> |
| | | |
| | | <!-- OSGi bundles properties --> |