Fix potential deadlock which may occur while performing simultaneous large searches and modify operations on the same connection. Use single channel lock per connection and share it across thread local ASN1 writers.
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.protocols.asn1; |
| | | |
| | |
| | | import java.io.OutputStream; |
| | | import java.nio.channels.ReadableByteChannel; |
| | | import java.nio.channels.WritableByteChannel; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | import org.opends.server.types.ByteSequence; |
| | | import org.opends.server.types.ByteString; |
| | |
| | | * |
| | | * @param channel |
| | | * The writable byte channel. |
| | | * @param writeLock |
| | | * The write lock to use when flushing to the destination. |
| | | * @return The new ASN.1 writer. |
| | | */ |
| | | public static ASN1Writer getWriter(WritableByteChannel channel) |
| | | public static ASN1Writer getWriter(WritableByteChannel channel, |
| | | ReentrantLock writeLock) |
| | | { |
| | | return new ASN1ByteChannelWriter(channel, 4096); |
| | | return new ASN1ByteChannelWriter(channel, writeLock, 4096); |
| | | } |
| | | |
| | | |
| | |
| | | * |
| | | * @param channel |
| | | * The writable byte channel. |
| | | * @param writeLock |
| | | * The write lock to use when flushing to the destination. |
| | | * @param bufferSize |
| | | * The buffer size to use when writing to the channel. |
| | | * @return The new ASN.1 writer. |
| | | */ |
| | | public static ASN1Writer getWriter(WritableByteChannel channel, |
| | | ReentrantLock writeLock, |
| | | int bufferSize) |
| | | { |
| | | return new ASN1ByteChannelWriter(channel, bufferSize); |
| | | return new ASN1ByteChannelWriter(channel, writeLock, bufferSize); |
| | | } |
| | | |
| | | |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.protocols.asn1; |
| | | |
| | |
| | | * Constructs a new ASN1ByteChannelWriter. |
| | | * |
| | | * @param byteChannel The WritableByteChannel to write to. |
| | | * @param writeLock The write lock to use when flushing to the destination. |
| | | * @param writeBufferSize The NIO ByteBuffer size. |
| | | */ |
| | | ASN1ByteChannelWriter(WritableByteChannel byteChannel, |
| | | int writeBufferSize) |
| | | ReentrantLock writeLock, |
| | | int writeBufferSize) |
| | | { |
| | | this.byteChannel = byteChannel; |
| | | this.byteBuffer = ByteBuffer.allocate(writeBufferSize); |
| | | this.flushLock = new ReentrantLock(false); |
| | | this.flushLock = writeLock; |
| | | |
| | | ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(); |
| | | this.writer = new ASN1OutputStreamWriter(bufferStream); |
| | |
| | | public void flush() throws IOException |
| | | { |
| | | byteBuffer.flip(); |
| | | if (!flushLock.isHeldByCurrentThread()) |
| | | { |
| | | flushLock.lock(); |
| | | } |
| | | try |
| | | { |
| | | if (!flushLock.isHeldByCurrentThread()) |
| | | { |
| | | flushLock.lock(); |
| | | } |
| | | byteChannel.write(byteBuffer); |
| | | } |
| | | finally |
| | |
| | | { |
| | | flushLock.lock(); |
| | | } |
| | | while(byteBuffer.hasRemaining()) |
| | | try |
| | | { |
| | | byteChannel.write(byteBuffer); |
| | | while (byteBuffer.hasRemaining()) |
| | | { |
| | | byteChannel.write(byteBuffer); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | flushLock.unlock(); |
| | | throw e; |
| | | } |
| | | byteBuffer.clear(); |
| | | } |
| | |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | |
| | | |
| | | private final RedirectingByteChannel saslChannel; |
| | | private final RedirectingByteChannel tlsChannel; |
| | | private final ReentrantLock writeLock; |
| | | private volatile ConnectionSecurityProvider activeProvider = null; |
| | | private volatile ConnectionSecurityProvider tlsPendingProvider = null; |
| | | private volatile ConnectionSecurityProvider saslPendingProvider = null; |
| | |
| | | this.asn1Reader = |
| | | ASN1.getReader(saslChannel, APPLICATION_BUFFER_SIZE, connectionHandler |
| | | .getMaxRequestSize()); |
| | | writeLock = new ReentrantLock(); |
| | | |
| | | asn1WriterMap = new ConcurrentHashMap<Thread,ASN1Writer>(); |
| | | |
| | |
| | | if (isSecure()) |
| | | { |
| | | int appBufSize = activeProvider.getAppBufSize(); |
| | | asn1Writer = ASN1.getWriter(saslChannel, appBufSize); |
| | | asn1Writer = ASN1.getWriter(saslChannel, writeLock, appBufSize); |
| | | } |
| | | else |
| | | { |
| | | asn1Writer = ASN1.getWriter(saslChannel, |
| | | asn1Writer = ASN1.getWriter(saslChannel, writeLock, |
| | | APPLICATION_BUFFER_SIZE); |
| | | } |
| | | asn1WriterMap.put(currentThread, asn1Writer); |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.protocols.asn1; |
| | | |
| | |
| | | import java.nio.channels.Channels; |
| | | import java.nio.channels.ReadableByteChannel; |
| | | import java.nio.channels.WritableByteChannel; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | /** |
| | | * Test class for ASN1ByteChannelWriter |
| | |
| | | { |
| | | private ByteArrayOutputStream outStream = new ByteArrayOutputStream(); |
| | | private WritableByteChannel outChannel = Channels.newChannel(outStream); |
| | | private ASN1Writer writer = new ASN1ByteChannelWriter(outChannel, 500); |
| | | private ASN1Writer writer = new ASN1ByteChannelWriter(outChannel, |
| | | new ReentrantLock(), 500); |
| | | |
| | | @Override |
| | | ASN1Writer getWriter() throws IOException |